blob: bdd77eebce88a0354db5f24696ba91d790e2e5de [file] [log] [blame]
/*
* Copyright 2018 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.migration;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaSpoutMigration {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMigration.class);
private static class Configuration {
private String zkHosts;
private String zkRoot;
private String spoutId;
private String topic;
private boolean isWildcardTopic;
private String kafkaBootstrapServers;
private String newSpoutConsumerGroup;
private int zkSessionTimeoutMs;
private int zkConnectionTimeoutMs;
private int zkRetryTimes;
private int zkRetryIntervalMs;
}
/**
* Migrates offsets from the Zookeeper store used by the storm-kafka non-Trident spouts, to Kafka's offset store used by the
* storm-kafka-client non-Trident spout.
*/
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Args: confFile");
System.exit(1);
}
Map<String, Object> conf = Utils.findAndReadConfigFile(args[0]);
Configuration configuration = new Configuration();
configuration.zkHosts = MapUtil.getOrError(conf, "zookeeper.servers");
configuration.zkRoot = MapUtil.getOrError(conf, "zookeeper.root");
configuration.spoutId = MapUtil.getOrError(conf, "spout.id");
configuration.topic = MapUtil.getOrError(conf, "topic");
configuration.isWildcardTopic = MapUtil.getOrError(conf, "is.wildcard.topic");
configuration.kafkaBootstrapServers = MapUtil.getOrError(conf, "kafka.bootstrap.servers");
configuration.newSpoutConsumerGroup = MapUtil.getOrError(conf, "new.spout.consumer.group");
configuration.zkSessionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.session.timeout.ms");
configuration.zkConnectionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.connection.timeout.ms");
configuration.zkRetryTimes = MapUtil.getOrError(conf, "zookeeper.retry.times");
configuration.zkRetryIntervalMs = MapUtil.getOrError(conf, "zookeeper.retry.interval.ms");
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(configuration);
LOG.info("Migrating offsets {}", offsetsToCommit);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.newSpoutConsumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props)) {
consumer.assign(offsetsToCommit.keySet());
consumer.commitSync(offsetsToCommit);
}
LOG.info("Migrated offsets {} to consumer group {}", offsetsToCommit, configuration.newSpoutConsumerGroup);
}
private static Map<TopicPartition, OffsetAndMetadata> getOffsetsAtPath(
CuratorFramework curator, ObjectMapper objectMapper, String partitionsRoot) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
if (curator.checkExists().forPath(partitionsRoot) == null) {
throw new RuntimeException("No such path " + partitionsRoot);
}
List<String> partitionPaths = curator.getChildren().forPath(partitionsRoot);
for (String partitionPath : partitionPaths) {
String absPartitionPath = partitionsRoot + "/" + partitionPath;
LOG.info("Reading offset data from path {}", absPartitionPath);
byte[] partitionBytes = curator.getData().forPath(absPartitionPath);
Map<String, Object> partitionMetadata = objectMapper.readValue(partitionBytes, new TypeReference<Map<String, Object>>() {
});
String topic = (String) partitionMetadata.get("topic");
int partition = ((Number) partitionMetadata.get("partition")).intValue();
long offset = ((Number) partitionMetadata.get("offset")).longValue();
offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset));
}
return offsets;
}
private static Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(Configuration configuration) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
try (CuratorFramework curator = newCurator(configuration)) {
curator.start();
ObjectMapper objectMapper = new ObjectMapper();
String spoutRoot = configuration.zkRoot + "/" + configuration.spoutId;
if (curator.checkExists().forPath(spoutRoot) == null) {
throw new RuntimeException("No such path " + spoutRoot);
}
if (configuration.isWildcardTopic) {
LOG.info("Expecting wildcard topics, looking for topics in {}", spoutRoot);
List<String> topicPaths = curator.getChildren().forPath(spoutRoot);
for (String topicPath : topicPaths) {
if (!topicPath.matches(configuration.topic)) {
LOG.info("Skipping directory {} because it doesn't match the topic pattern {}", topicPath, configuration.topic);
} else {
String absTopicPath = spoutRoot + "/" + topicPath;
LOG.info("Looking for partitions in {}", absTopicPath);
offsetsToCommit.putAll(getOffsetsAtPath(curator, objectMapper, absTopicPath));
}
}
} else {
LOG.info("Expecting exact topic match, looking for offsets in {}", spoutRoot);
offsetsToCommit.putAll(getOffsetsAtPath(curator, objectMapper, spoutRoot));
}
}
return offsetsToCommit;
}
private static CuratorFramework newCurator(Configuration config) throws Exception {
return CuratorFrameworkFactory.newClient(config.zkHosts,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
new RetryNTimes(config.zkRetryTimes,
config.zkRetryIntervalMs));
}
}