blob: cb6dbdf21c43eb749a408b81f3ddcfded254d600 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint.kafka
import org.apache.samza.config.{ KafkaConfig, Config }
import org.apache.samza.SamzaException
import java.util.Properties
import kafka.producer.Producer
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.Partition
import grizzled.slf4j.Logging
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
import org.apache.samza.util.Util
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZKStringSerializer
import org.apache.samza.checkpoint.CheckpointManagerFactory
import org.apache.samza.checkpoint.CheckpointManager
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
val systemName = config
.getCheckpointSystem
.getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
val injectedProducerProps = Map(
"request.required.acks" -> "-1",
"producer.type" -> "sync",
// Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
"message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
val producerConfig = config.getKafkaSystemProducerConfig(
systemName,
clientId,
injectedProducerProps)
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
val socketTimeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
val connectProducer = () => {
new Producer[Partition, Array[Byte]](producerConfig)
}
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
val connectZk = () => {
new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
}
val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
val jobId = config.getJobId.getOrElse("1")
val brokersListString = Option(producerConfig.brokerList)
.getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout)
val checkpointTopic = getTopic(jobName, jobId)
// This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info.
val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size
new KafkaCheckpointManager(
clientId,
checkpointTopic,
systemName,
totalPartitions,
replicationFactor,
socketTimeout,
bufferSize,
fetchSize,
metadataStore,
connectProducer,
connectZk)
}
private def getTopic(jobName: String, jobId: String) =
"__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
}