SAMZA-226; auto-create changelog streams
diff --git a/.reviewboardrc b/.reviewboardrc
index 9339119..c561205 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -2,4 +2,4 @@
REPOSITORY = 'samza'
GUESS_DESCRIPTION = True
TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/master'
+TRACKING_BRANCH = 'origin/0.8.0'
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4266a13..06c5f05 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -783,6 +783,14 @@
</tr>
<tr>
+ <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
+ <td class="default">2</td>
+ <td class="description">
+ The property defines the number of replicas to use for the change log stream.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="regex-rewriter">
Consuming all Kafka topics matching a regular expression<br>
<span class="subtitle">
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java
index 2048e90..9f7ade0 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -26,6 +26,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Store and retrieve named, typed values as configuration for classes implementing this interface.
@@ -52,6 +54,21 @@
return new MapConfig(out);
}
+ public Config regexSubset(String regex) {
+ Map<String, String> out = new HashMap<String, String>();
+ Pattern pattern = Pattern.compile(regex);
+
+ for (Entry<String, String> entry : entrySet()) {
+ String k = entry.getKey();
+ Matcher matcher = pattern.matcher(k);
+ if(matcher.find()){
+ out.put(k, entry.getValue());
+ }
+ }
+
+ return new MapConfig(out);
+ }
+
public String get(String k, String defaultString) {
if (!containsKey(k)) {
return defaultString;
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 571c606..8995ba3 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -50,4 +50,11 @@
* requested in the parameter set.
*/
Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
+
+ /**
+ * An API to create a change log stream
+ * @param streamName The name of the stream to be created in the underlying stream
+ * @param numOfPartitions The number of partitions in the changelog stream
+ */
+ void createChangelogStream(String streamName, int numOfPartitions);
}
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 38e313f..01997ae 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -54,6 +55,11 @@
}
@Override
+ public void createChangelogStream(String streamName, int numOfPartitions){
+ throw new SamzaException("Method not implemented");
+ }
+
+ @Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d91d6d7..8a39766 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -275,10 +275,6 @@
info("Got change log system streams: %s" format changeLogSystemStreams)
- val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
-
- info("Got change log stream metadata: %s" format changeLogMetadata)
-
val serdeManager = new SerdeManager(
serdes = serdes,
systemKeySerdes = systemKeySerdes,
@@ -413,6 +409,12 @@
val containerContext = new SamzaContainerContext(containerName, config, taskNames)
+ // compute the number of partitions necessary for the change log stream creation
+ var maxChangeLogStreamPartitions = -1
+ taskNameToChangeLogPartitionMapping.map(x => if(maxChangeLogStreamPartitions < x._2) maxChangeLogStreamPartitions = x._2)
+ // Increment by 1 because partition starts from 0, but we need the absolute count, this value is used for change log topic creation.
+ maxChangeLogStreamPartitions = maxChangeLogStreamPartitions + 1
+
val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => {
debug("Setting up task instance: %s" format taskName)
@@ -466,18 +468,16 @@
info("Got task stores: %s" format taskStores)
- val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata)
-
- info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
-
val storageManager = new TaskStorageManager(
taskName = taskName,
taskStores = taskStores,
storeConsumers = storeConsumers,
changeLogSystemStreams = changeLogSystemStreams,
- changeLogOldestOffsets = changeLogOldestOffsets,
+ maxChangeLogStreamPartitions,
+ streamMetadataCache = streamMetadataCache,
storeBaseDir = storeBaseDir,
- partitionForThisTaskName)
+ partitionForThisTaskName,
+ systemAdmins = systemAdmins)
val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames))
@@ -519,16 +519,6 @@
reporters = reporters,
jvm = jvm)
}
-
- /**
- * Builds a map from SystemStreamPartition to oldest offset for changelogs.
- */
- def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
- inputStreamMetadata
- .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
- .filter(_._2 != null)
- .mapValues(_.getOldestOffset)
- }
}
class SamzaContainer(
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index b8719c3..af0928e 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -23,10 +23,7 @@
import scala.collection.Map
import org.apache.samza.util.Logging
import org.apache.samza.Partition
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionIterator
+import org.apache.samza.system._
import org.apache.samza.util.Util
import org.apache.samza.SamzaException
import org.apache.samza.container.TaskName
@@ -50,16 +47,20 @@
taskStores: Map[String, StorageEngine] = Map(),
storeConsumers: Map[String, SystemConsumer] = Map(),
changeLogSystemStreams: Map[String, SystemStream] = Map(),
- changeLogOldestOffsets: Map[SystemStream, String] = Map(),
+ changeLogStreamPartitions: Int,
+ streamMetadataCache: StreamMetadataCache,
storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
- partition: Partition) extends Logging {
+ partition: Partition,
+ systemAdmins: Map[String, SystemAdmin]) extends Logging {
var taskStoresToRestore = taskStores
+ var changeLogOldestOffsets: Map[SystemStream, String] = Map()
def apply(storageEngineName: String) = taskStores(storageEngineName)
def init {
cleanBaseDirs
+ createStreams
startConsumers
restoreStores
stopConsumers
@@ -67,7 +68,6 @@
private def cleanBaseDirs {
debug("Cleaning base directories for stores.")
-
taskStores.keys.foreach(storeName => {
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
@@ -78,6 +78,22 @@
})
}
+ private def createStreams = {
+ info("Creating streams that are not present for changelog")
+
+ for ((storeName, systemStream) <- changeLogSystemStreams) {
+ var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+ systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions)
+ }
+
+ val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
+ info("Got change log stream metadata: %s" format changeLogMetadata)
+
+ changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata)
+ info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
+ }
+
+
private def startConsumers {
debug("Starting consumers for stores.")
@@ -131,4 +147,15 @@
taskStores.values.foreach(_.stop)
}
+
+
+ /**
+ * Builds a map from SystemStreamPartition to oldest offset for changelogs.
+ */
+ private def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
+ inputStreamMetadata
+ .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
+ .filter(_._2 != null)
+ .mapValues(_.getOldestOffset)
+ }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index 98e92bc..ec1d749 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -130,4 +130,8 @@
})
enterPosition
}
+
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+ throw new SamzaException("Method not implemented")
+ }
}
\ No newline at end of file
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a79ecca..cff2fd6 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -244,6 +244,10 @@
def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
Map[String, SystemStreamMetadata]()
+
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+ new SamzaException("Method not implemented")
+ }
}
}
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9fc1f56..235c881 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -19,11 +19,18 @@
package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+
import scala.collection.JavaConversions._
import kafka.consumer.ConsumerConfig
import java.util.Properties
import kafka.producer.ProducerConfig
import java.util.UUID
+import scala.collection.JavaConverters._
+import org.apache.samza.system.kafka.KafkaSystemFactory
+import scala.collection.mutable.ArrayBuffer
+
object KafkaConfig {
val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
@@ -34,6 +41,10 @@
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+ val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
+ val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka"
+ val CHANGELOG_STREAM_NAMES_REGEX = "stores\\..*\\.changelog$"
+
/**
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
@@ -84,6 +95,30 @@
def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+ def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
+ def getChangelogTopicNames() = {
+ val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
+ val topicNamesList = ArrayBuffer[String]()
+ for((changelogConfig, changelogName) <- changelogConfigs){
+ // Lookup the factory for this particular stream and verify if it's a kafka system
+ val changelogNameSplit = changelogName.split("\\.")
+ if(changelogNameSplit.length < 2) throw new SamzaException("Changelog name not in expected format")
+ val factoryName = config.get(String.format(SystemConfig.SYSTEM_FACTORY, changelogNameSplit(0)))
+ if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
+ topicNamesList += changelogNameSplit(1)
+ }
+ }
+ topicNamesList.toList
+ }
+
+ // Get all kafka properties for changelog stream topic creation
+ def getChangelogKafkaProperties(name: String) = {
+ val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
+
+ val kafkaChangeLogProperties = new Properties
+ filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
+ kafkaChangeLogProperties
+ }
// kafka config
def getKafkaSystemConsumerConfig(
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5ac33ea..6b5b9a4 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -19,25 +19,21 @@
package org.apache.samza.system.kafka
+import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.ClientUtilTopicMetadataStore
-import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
import kafka.api._
import kafka.consumer.SimpleConsumer
-import kafka.utils.Utils
-import kafka.client.ClientUtils
-import kafka.common.TopicAndPartition
-import kafka.common.ErrorMapping
-import kafka.cluster.Broker
-import org.apache.samza.util.Logging
-import java.util.UUID
+import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping}
+import java.util.{Properties, UUID}
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import kafka.consumer.ConsumerConfig
+import kafka.admin.AdminUtils
object KafkaSystemAdmin extends Logging {
/**
@@ -73,40 +69,59 @@
}
/**
+ * A helper class that is used to construct the changelog stream specific information
+ * @param replicationFactor The number of replicas for the changelog stream
+ * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
+ */
+case class ChangeLogInfo(var replicationFactor: Int, var kafkaProps: Properties)
+
+/**
* A Kafka-based implementation of SystemAdmin.
*/
class KafkaSystemAdmin(
/**
- * The system name to use when creating SystemStreamPartitions to return in
- * the getSystemStreamMetadata responser.
- */
+ * The system name to use when creating SystemStreamPartitions to return in
+ * the getSystemStreamMetadata responser.
+ */
systemName: String,
-
// TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
/**
- * List of brokers that are part of the Kafka system that we wish to
- * interact with. The format is host1:port1,host2:port2.
- */
+ * List of brokers that are part of the Kafka system that we wish to
+ * interact with. The format is host1:port1,host2:port2.
+ */
brokerListString: String,
/**
- * The timeout to use for the simple consumer when fetching metadata from
- * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
- */
+ * The timeout to use for the simple consumer when fetching metadata from
+ * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
+ */
timeout: Int = Int.MaxValue,
/**
- * The buffer size to use for the simple consumer when fetching metadata
- * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
- * configuration.
- */
+ * The buffer size to use for the simple consumer when fetching metadata
+ * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
+ * configuration.
+ */
bufferSize: Int = ConsumerConfig.SocketBufferSize,
/**
- * The client ID to use for the simple consumer when fetching metadata from
- * Kafka. Equivalent to Kafka's client.id configuration.
+ * The client ID to use for the simple consumer when fetching metadata from
+ * Kafka. Equivalent to Kafka's client.id configuration.
+ */
+ clientId: String = UUID.randomUUID.toString,
+
+ /**
+ * A function that returns a Zookeeper client to connect to fetch the meta
+ * data information
+ */
+ connectZk: () => ZkClient,
+
+ /**
+ * Replication factor for the Changelog topic in kafka
+ * Kafka properties to be used during the Changelog topic creation
*/
- clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging {
+ topicMetaInformation: Map[String, ChangeLogInfo] = Map[String, ChangeLogInfo]()
+ ) extends SystemAdmin with Logging {
import KafkaSystemAdmin._
@@ -259,4 +274,82 @@
offsets
}
+
+ private def createTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) {
+ val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+ info("Attempting to create change log topic %s." format topicName)
+ info("Using partition count "+ numKafkaChangeLogPartitions + " for creating change log topic")
+ val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangeLogException("Unable to find topic information for topic " + topicName))
+ retryBackoff.run(
+ loop => {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.createTopic(
+ zkClient,
+ topicName,
+ numKafkaChangeLogPartitions,
+ topicMetaInfo.replicationFactor,
+ topicMetaInfo.kafkaProps)
+ } finally {
+ zkClient.close
+ }
+
+ info("Created changelog topic %s." format topicName)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: TopicExistsException =>
+ info("Changelog topic %s already exists." format topicName)
+ loop.done
+ case e: Exception =>
+ warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
+ debug("Exception detail:", e)
+ }
+ }
+ )
+ }
+
+ private def validateTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) {
+ val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+ info("Validating changelog topic %s." format topicName)
+ retryBackoff.run(
+ loop => {
+ val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(topicName)
+ ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+
+ val partitionCount = topicMetadata.partitionsMetadata.length
+ if (partitionCount < numKafkaChangeLogPartitions) {
+ throw new KafkaChangeLogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangeLogPartitions))
+ }
+
+ info("Successfully validated changelog topic %s." format topicName)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: KafkaChangeLogException => throw e
+ case e: Exception =>
+ warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
+ debug("Exception detail:", e)
+ }
+ }
+ )
+ }
+
+ /**
+ * Exception to be thrown when the change log stream creation or validation has failed
+ */
+ class KafkaChangeLogException(s: String, t: Throwable) extends SamzaException(s, t) {
+ def this(s: String) = this(s, null)
+ }
+
+ override def createChangelogStream(topicName: String, numKafkaChangeLogPartitions: Int) = {
+ createTopicInKafka(topicName, numKafkaChangeLogPartitions)
+ validateTopicInKafka(topicName, numKafkaChangeLogPartitions)
+ }
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4ed5e88..9ab836c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,6 +19,9 @@
package org.apache.samza.system.kafka
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
import org.apache.samza.util.KafkaUtil
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
@@ -95,12 +98,23 @@
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
+ val topicNames = config.getChangelogTopicNames()
+ var topicMetaInformation = Map[String, ChangeLogInfo]()
+
+ // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
+ for(topicName <- topicNames) {
+ val replicationFactor = config.getChangelogStreamReplicationFactor(topicName).getOrElse("2").toInt
+ val changelogInfo = ChangeLogInfo(replicationFactor, config.getChangelogKafkaProperties(topicName))
+ topicMetaInformation += topicName -> changelogInfo
+ }
new KafkaSystemAdmin(
systemName,
brokerListString,
timeout,
bufferSize,
- clientId)
+ clientId,
+ () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer),
+ topicMetaInformation)
}
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index d660b91..f1b7511 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,11 +19,9 @@
package org.apache.samza.util
-import org.apache.samza.config.{KafkaConfig, Config, ConfigException}
+import org.apache.samza.config.{Config, ConfigException}
import org.apache.samza.config.JobConfig.Config2Job
import java.util.concurrent.atomic.AtomicLong
-import kafka.client.ClientUtils
-import org.apache.samza.SamzaException
object KafkaUtil {
val counter = new AtomicLong(0)
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 5ceb109..a5af465 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -40,6 +40,7 @@
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
+import org.apache.samza.config.MapConfig
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.SystemStreamPartition
@@ -164,6 +165,10 @@
class TestKafkaSystemAdmin {
import TestKafkaSystemAdmin._
+ val systemName = "test"
+ // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
+ val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+
def testShouldAssembleMetadata {
val oldestOffsets = Map(
new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1",
@@ -205,8 +210,6 @@
@Test
def testShouldGetOldestNewestAndNextOffsets {
- val systemName = "test"
- val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
// Create an empty topic with 50 partitions, but with no offsets.
createTopic
@@ -271,7 +274,7 @@
@Test
def testNonExistentTopic {
- val systemAdmin = new KafkaSystemAdmin("test", brokers)
+
val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
@@ -280,7 +283,6 @@
@Test
def testOffsetsAfter {
- val systemAdmin = new KafkaSystemAdmin("test", brokers)
val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
@@ -290,7 +292,7 @@
assertEquals("3", offsetsAfter(ssp2))
}
- class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
import kafka.api.{ TopicMetadata, TopicMetadataResponse }
// Simulate Kafka telling us that the leader for the topic is not available
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index fa1d51b..c0a20af 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
@@ -53,7 +54,12 @@
return metadata;
}
- @Override
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+ throw new SamzaException("Method not implemented");
+ }
+
+ @Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 118f5ee..b23968b 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,6 +22,7 @@
import java.util.Properties
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import java.util.logging.{Level, LogManager, Handler, Logger}
import kafka.admin.AdminUtils
import kafka.common.ErrorMapping
@@ -87,6 +88,9 @@
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+ props1.setProperty("auto.create.topics.enable","false")
+ props2.setProperty("auto.create.topics.enable","false")
+ props3.setProperty("auto.create.topics.enable","false")
val config = new java.util.Properties()
val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
@@ -106,6 +110,7 @@
@BeforeClass
def beforeSetupServers {
+
zookeeper = new EmbeddedZookeeper(zkConnect)
server1 = TestUtils.createServer(new KafkaConfig(props1))
server2 = TestUtils.createServer(new KafkaConfig(props2))
@@ -124,16 +129,10 @@
INPUT_TOPIC,
TOTAL_TASK_NAMES,
REPLICATION_FACTOR)
-
- AdminUtils.createTopic(
- zkClient,
- STATE_TOPIC,
- TOTAL_TASK_NAMES,
- REPLICATION_FACTOR)
}
def validateTopics {
- val topics = Set(STATE_TOPIC, INPUT_TOPIC)
+ val topics = Set(INPUT_TOPIC)
var done = false
var retries = 0
@@ -208,6 +207,7 @@
"stores.mystore.key.serde" -> "string",
"stores.mystore.msg.serde" -> "string",
"stores.mystore.changelog" -> "kafka.mystore",
+ "stores.mystore.changelog.replication.factor" -> "1",
"systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
// Always start consuming at offset 0. This avoids a race condition between