Revert "SAMZA-226; auto-create changelog streams"

This reverts commit d7680cab1ca0c977c56e931548a80728af58578a.
diff --git a/.reviewboardrc b/.reviewboardrc
index c561205..9339119 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -2,4 +2,4 @@
 REPOSITORY = 'samza'
 GUESS_DESCRIPTION = True
 TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/0.8.0'
+TRACKING_BRANCH = 'origin/master'
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 06c5f05..4266a13 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -783,14 +783,6 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
-                    <td class="default">2</td>
-                    <td class="description">
-                        The property defines the number of replicas to use for the change log stream. 
-                    </td>
-                </tr>
-
-                <tr>
                     <th colspan="3" class="section" id="regex-rewriter">
                         Consuming all Kafka topics matching a regular expression<br>
                         <span class="subtitle">
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java
index 9f7ade0..2048e90 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -26,8 +26,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Store and retrieve named, typed values as configuration for classes implementing this interface.
@@ -54,21 +52,6 @@
     return new MapConfig(out);
   }
 
-  public Config regexSubset(String regex) {
-      Map<String, String> out = new HashMap<String, String>();
-      Pattern pattern = Pattern.compile(regex);
-
-      for (Entry<String, String> entry : entrySet()) {
-        String k = entry.getKey();
-        Matcher matcher = pattern.matcher(k);
-        if(matcher.find()){
-            out.put(k, entry.getValue());
-        }
-      }
-
-      return new MapConfig(out);
-  }
-
   public String get(String k, String defaultString) {
     if (!containsKey(k)) {
       return defaultString;
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8995ba3..571c606 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -50,11 +50,4 @@
    *         requested in the parameter set.
    */
   Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
-
-    /**
-     * An API to create a change log stream
-     * @param streamName The name of the stream to be created in the underlying stream
-     * @param numOfPartitions The number of partitions in the changelog stream
-     */
-  void createChangelogStream(String streamName, int numOfPartitions);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 01997ae..38e313f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -55,11 +54,6 @@
   }
 
   @Override
-  public void createChangelogStream(String streamName, int numOfPartitions){
-    throw new SamzaException("Method not implemented");
-  }
-
-  @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 8a39766..d91d6d7 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -275,6 +275,10 @@
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
+    val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
+
+    info("Got change log stream metadata: %s" format changeLogMetadata)
+
     val serdeManager = new SerdeManager(
       serdes = serdes,
       systemKeySerdes = systemKeySerdes,
@@ -409,12 +413,6 @@
 
     val containerContext = new SamzaContainerContext(containerName, config, taskNames)
 
-    // compute the number of partitions necessary for the change log stream creation
-    var maxChangeLogStreamPartitions = -1
-    taskNameToChangeLogPartitionMapping.map(x => if(maxChangeLogStreamPartitions < x._2) maxChangeLogStreamPartitions = x._2)
-    // Increment by 1 because partition starts from 0, but we need the absolute count, this value is used for change log topic creation.
-    maxChangeLogStreamPartitions = maxChangeLogStreamPartitions + 1
-
     val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => {
       debug("Setting up task instance: %s" format taskName)
 
@@ -468,16 +466,18 @@
 
       info("Got task stores: %s" format taskStores)
 
+      val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata)
+
+      info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
+
       val storageManager = new TaskStorageManager(
         taskName = taskName,
         taskStores = taskStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
-        maxChangeLogStreamPartitions,
-        streamMetadataCache = streamMetadataCache,
+        changeLogOldestOffsets = changeLogOldestOffsets,
         storeBaseDir = storeBaseDir,
-        partitionForThisTaskName,
-        systemAdmins = systemAdmins)
+        partitionForThisTaskName)
 
       val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames))
 
@@ -519,6 +519,16 @@
       reporters = reporters,
       jvm = jvm)
   }
+
+  /**
+   * Builds a map from SystemStreamPartition to oldest offset for changelogs.
+   */
+  def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
+    inputStreamMetadata
+      .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
+      .filter(_._2 != null)
+      .mapValues(_.getOldestOffset)
+  }
 }
 
 class SamzaContainer(
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index af0928e..b8719c3 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -23,7 +23,10 @@
 import scala.collection.Map
 import org.apache.samza.util.Logging
 import org.apache.samza.Partition
-import org.apache.samza.system._
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionIterator
 import org.apache.samza.util.Util
 import org.apache.samza.SamzaException
 import org.apache.samza.container.TaskName
@@ -47,20 +50,16 @@
   taskStores: Map[String, StorageEngine] = Map(),
   storeConsumers: Map[String, SystemConsumer] = Map(),
   changeLogSystemStreams: Map[String, SystemStream] = Map(),
-  changeLogStreamPartitions: Int,
-  streamMetadataCache: StreamMetadataCache,
+  changeLogOldestOffsets: Map[SystemStream, String] = Map(),
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition,
-  systemAdmins: Map[String, SystemAdmin]) extends Logging {
+  partition: Partition) extends Logging {
 
   var taskStoresToRestore = taskStores
-  var changeLogOldestOffsets: Map[SystemStream, String] = Map()
 
   def apply(storageEngineName: String) = taskStores(storageEngineName)
 
   def init {
     cleanBaseDirs
-    createStreams
     startConsumers
     restoreStores
     stopConsumers
@@ -68,6 +67,7 @@
 
   private def cleanBaseDirs {
     debug("Cleaning base directories for stores.")
+
     taskStores.keys.foreach(storeName => {
       val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
 
@@ -78,22 +78,6 @@
     })
   }
 
-  private def createStreams = {
-    info("Creating streams that are not present for changelog")
-
-    for ((storeName, systemStream) <- changeLogSystemStreams) {
-      var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
-      systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions)
-    }
-
-    val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
-    info("Got change log stream metadata: %s" format changeLogMetadata)
-
-    changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata)
-    info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
-  }
-
-
   private def startConsumers {
     debug("Starting consumers for stores.")
 
@@ -147,15 +131,4 @@
 
     taskStores.values.foreach(_.stop)
   }
-
-
-  /**
-   * Builds a map from SystemStreamPartition to oldest offset for changelogs.
-   */
-  private def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
-    inputStreamMetadata
-      .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
-      .filter(_._2 != null)
-      .mapValues(_.getOldestOffset)
-  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index ec1d749..98e92bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -130,8 +130,4 @@
       })
     enterPosition
   }
-
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new SamzaException("Method not implemented")
-  }
 }
\ No newline at end of file
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index cff2fd6..a79ecca 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -244,10 +244,6 @@
 
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
         Map[String, SystemStreamMetadata]()
-
-      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-        new SamzaException("Method not implemented")
-      }
     }
   }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 235c881..9fc1f56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -19,18 +19,11 @@
 
 package org.apache.samza.config
 
-
-import org.apache.samza.SamzaException
-
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
 import java.util.Properties
 import kafka.producer.ProducerConfig
 import java.util.UUID
-import scala.collection.JavaConverters._
-import org.apache.samza.system.kafka.KafkaSystemFactory
-import scala.collection.mutable.ArrayBuffer
-
 
 object KafkaConfig {
   val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
@@ -41,10 +34,6 @@
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
-  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
-  val CHANGELOG_STREAM_KAFKA_SETTINGS  = "stores.%s.changelog.kafka"
-  val CHANGELOG_STREAM_NAMES_REGEX = "stores\\..*\\.changelog$"
-
   /**
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
@@ -95,30 +84,6 @@
   def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
   def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
   def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
-  def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
-  def getChangelogTopicNames() = {
-    val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
-    val topicNamesList = ArrayBuffer[String]()
-    for((changelogConfig, changelogName) <- changelogConfigs){
-      // Lookup the factory for this particular stream and verify if it's a kafka system
-      val changelogNameSplit = changelogName.split("\\.")
-      if(changelogNameSplit.length < 2) throw new SamzaException("Changelog name not in expected format")
-      val factoryName = config.get(String.format(SystemConfig.SYSTEM_FACTORY, changelogNameSplit(0)))
-      if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
-        topicNamesList += changelogNameSplit(1)
-      }
-    }
-    topicNamesList.toList
-  }
-
-  // Get all kafka properties for changelog stream topic creation
-  def getChangelogKafkaProperties(name: String) = {
-    val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
-
-    val kafkaChangeLogProperties = new Properties
-    filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
-    kafkaChangeLogProperties
-  }
 
   // kafka config
   def getKafkaSystemConsumerConfig(
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 6b5b9a4..5ac33ea 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -19,21 +19,25 @@
 
 package org.apache.samza.system.kafka
 
-import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
 import kafka.api._
 import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping}
-import java.util.{Properties, UUID}
+import kafka.utils.Utils
+import kafka.client.ClientUtils
+import kafka.common.TopicAndPartition
+import kafka.common.ErrorMapping
+import kafka.cluster.Broker
+import org.apache.samza.util.Logging
+import java.util.UUID
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import kafka.consumer.ConsumerConfig
-import kafka.admin.AdminUtils
 
 object KafkaSystemAdmin extends Logging {
   /**
@@ -69,59 +73,40 @@
 }
 
 /**
- * A helper class that is used to construct the changelog stream specific information
- * @param replicationFactor The number of replicas for the changelog stream
- * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
- */
-case class ChangeLogInfo(var replicationFactor: Int, var kafkaProps: Properties)
-
-/**
  * A Kafka-based implementation of SystemAdmin.
  */
 class KafkaSystemAdmin(
   /**
-  * The system name to use when creating SystemStreamPartitions to return in
-  * the getSystemStreamMetadata responser.
-  */
+   * The system name to use when creating SystemStreamPartitions to return in
+   * the getSystemStreamMetadata responser.
+   */
   systemName: String,
+
   // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
   /**
-  * List of brokers that are part of the Kafka system that we wish to
-  * interact with. The format is host1:port1,host2:port2.
-  */
+   * List of brokers that are part of the Kafka system that we wish to
+   * interact with. The format is host1:port1,host2:port2.
+   */
   brokerListString: String,
 
   /**
-  * The timeout to use for the simple consumer when fetching metadata from
-  * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
-  */
+   * The timeout to use for the simple consumer when fetching metadata from
+   * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
+   */
   timeout: Int = Int.MaxValue,
 
   /**
-  * The buffer size to use for the simple consumer when fetching metadata
-  * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
-  * configuration.
-  */
+   * The buffer size to use for the simple consumer when fetching metadata
+   * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
+   * configuration.
+   */
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
 
   /**
-  * The client ID to use for the simple consumer when fetching metadata from
-  * Kafka. Equivalent to Kafka's client.id configuration.
-  */
-  clientId: String = UUID.randomUUID.toString,
-
-  /**
-  * A function that returns a Zookeeper client to connect to fetch the meta
-  * data information
-  */
-  connectZk: () => ZkClient,
-
-  /**
-   * Replication factor for the Changelog topic in kafka
-   * Kafka properties to be used during the Changelog topic creation
+   * The client ID to use for the simple consumer when fetching metadata from
+   * Kafka. Equivalent to Kafka's client.id configuration.
    */
-  topicMetaInformation: Map[String, ChangeLogInfo] =  Map[String, ChangeLogInfo]()
-  ) extends SystemAdmin with Logging {
+  clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
@@ -274,82 +259,4 @@
 
     offsets
   }
-
-  private def createTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) {
-    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Attempting to create change log topic %s." format topicName)
-    info("Using partition count "+ numKafkaChangeLogPartitions + " for creating change log topic")
-    val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangeLogException("Unable to find topic information for topic " + topicName))
-    retryBackoff.run(
-      loop => {
-        val zkClient = connectZk()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            topicName,
-            numKafkaChangeLogPartitions,
-            topicMetaInfo.replicationFactor,
-            topicMetaInfo.kafkaProps)
-        } finally {
-          zkClient.close
-        }
-
-        info("Created changelog topic %s." format topicName)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: TopicExistsException =>
-            info("Changelog topic %s already exists." format topicName)
-            loop.done
-          case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  private def validateTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) {
-    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Validating changelog topic %s." format topicName)
-    retryBackoff.run(
-      loop => {
-        val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(topicName)
-        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
-
-        val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount < numKafkaChangeLogPartitions) {
-          throw new KafkaChangeLogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangeLogPartitions))
-        }
-
-        info("Successfully validated changelog topic %s." format topicName)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: KafkaChangeLogException => throw e
-          case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  /**
-   * Exception to be thrown when the change log stream creation or validation has failed
-   */
-  class KafkaChangeLogException(s: String, t: Throwable) extends SamzaException(s, t) {
-    def this(s: String) = this(s, null)
-  }
-
-  override def createChangelogStream(topicName: String, numKafkaChangeLogPartitions: Int) = {
-    createTopicInKafka(topicName, numKafkaChangeLogPartitions)
-    validateTopicInKafka(topicName, numKafkaChangeLogPartitions)
-  }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 9ab836c..4ed5e88 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,9 +19,6 @@
 
 package org.apache.samza.system.kafka
 
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
@@ -98,23 +95,12 @@
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val topicNames = config.getChangelogTopicNames()
-    var topicMetaInformation = Map[String, ChangeLogInfo]()
-
-    // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
-    for(topicName <- topicNames) {
-      val replicationFactor = config.getChangelogStreamReplicationFactor(topicName).getOrElse("2").toInt
-      val changelogInfo = ChangeLogInfo(replicationFactor, config.getChangelogKafkaProperties(topicName))
-      topicMetaInformation += topicName -> changelogInfo
-    }
 
     new KafkaSystemAdmin(
       systemName,
       brokerListString,
       timeout,
       bufferSize,
-      clientId,
-      () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer),
-      topicMetaInformation)
+      clientId)
   }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index f1b7511..d660b91 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,9 +19,11 @@
 
 package org.apache.samza.util
 
-import org.apache.samza.config.{Config, ConfigException}
+import org.apache.samza.config.{KafkaConfig, Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
 import java.util.concurrent.atomic.AtomicLong
+import kafka.client.ClientUtils
+import org.apache.samza.SamzaException
 
 object KafkaUtil {
   val counter = new AtomicLong(0)
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index a5af465..5ceb109 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -40,7 +40,6 @@
 
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.SystemStreamPartition
@@ -165,10 +164,6 @@
 class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
-  val systemName = "test"
-  // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
-  val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
-
   def testShouldAssembleMetadata {
     val oldestOffsets = Map(
       new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1",
@@ -210,6 +205,8 @@
 
   @Test
   def testShouldGetOldestNewestAndNextOffsets {
+    val systemName = "test"
+    val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
 
     // Create an empty topic with 50 partitions, but with no offsets.
     createTopic
@@ -274,7 +271,7 @@
 
   @Test
   def testNonExistentTopic {
-
+    val systemAdmin = new KafkaSystemAdmin("test", brokers)
     val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
@@ -283,6 +280,7 @@
 
   @Test
   def testOffsetsAfter {
+    val systemAdmin = new KafkaSystemAdmin("test", brokers)
     val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
     val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
     val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
@@ -292,7 +290,7 @@
     assertEquals("3", offsetsAfter(ssp2))
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
     import kafka.api.{ TopicMetadata, TopicMetadataResponse }
 
     // Simulate Kafka telling us that the leader for the topic is not available
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index c0a20af..fa1d51b 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
@@ -54,12 +53,7 @@
     return metadata;
   }
 
-    @Override
-    public void createChangelogStream(String streamName, int numOfPartitions) {
-        throw new SamzaException("Method not implemented");
-    }
-
-    @Override
+  @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index b23968b..118f5ee 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,7 +22,6 @@
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
-import java.util.logging.{Level, LogManager, Handler, Logger}
 
 import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
@@ -88,9 +87,6 @@
   val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
   val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.setProperty("auto.create.topics.enable","false")
-  props2.setProperty("auto.create.topics.enable","false")
-  props3.setProperty("auto.create.topics.enable","false")
 
   val config = new java.util.Properties()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
@@ -110,7 +106,6 @@
 
   @BeforeClass
   def beforeSetupServers {
-
     zookeeper = new EmbeddedZookeeper(zkConnect)
     server1 = TestUtils.createServer(new KafkaConfig(props1))
     server2 = TestUtils.createServer(new KafkaConfig(props2))
@@ -129,10 +124,16 @@
       INPUT_TOPIC,
       TOTAL_TASK_NAMES,
       REPLICATION_FACTOR)
+
+    AdminUtils.createTopic(
+      zkClient,
+      STATE_TOPIC,
+      TOTAL_TASK_NAMES,
+      REPLICATION_FACTOR)
   }
 
   def validateTopics {
-    val topics = Set(INPUT_TOPIC)
+    val topics = Set(STATE_TOPIC, INPUT_TOPIC)
     var done = false
     var retries = 0
 
@@ -207,7 +208,6 @@
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
     "stores.mystore.changelog" -> "kafka.mystore",
-    "stores.mystore.changelog.replication.factor" -> "1",
 
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
     // Always start consuming at offset 0. This avoids a race condition between