blob: ad8ed3a6291d15992abea124a7d933e3ae266911 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint.kafka
import org.I0Itec.zkclient.ZkClient
import grizzled.slf4j.Logging
import kafka.admin.AdminUtils
import kafka.api.FetchRequestBuilder
import kafka.api.OffsetRequest
import kafka.api.PartitionOffsetRequestInfo
import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
import kafka.common.TopicExistsException
import kafka.consumer.SimpleConsumer
import kafka.producer.KeyedMessage
import kafka.producer.Partitioner
import kafka.producer.Producer
import kafka.serializer.Decoder
import kafka.serializer.Encoder
import kafka.utils.Utils
import kafka.utils.VerifiableProperties
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.checkpoint.CheckpointManager
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.serializers.Serde
import org.apache.samza.system.kafka.TopicMetadataCache
import org.apache.samza.util.TopicMetadataStore
import org.apache.samza.util.ExponentialSleepStrategy
/**
* Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
* is uniquely identified by a job/partition combination. To read a checkpoint
* for a given job and partition combination (e.g. my-job, partition 1), we
* simply read the last message from the topic: __samza_checkpoint_my-job_1. If
* the topic does not yet exist, we assume that there is not yet any state for
* this job/partition pair, and return an empty checkpoint.
*/
class KafkaCheckpointManager(
clientId: String,
checkpointTopic: String,
systemName: String,
totalPartitions: Int,
replicationFactor: Int,
socketTimeout: Int,
bufferSize: Int,
fetchSize: Int,
metadataStore: TopicMetadataStore,
connectProducer: () => Producer[Partition, Array[Byte]],
connectZk: () => ZkClient,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
var partitions = Set[Partition]()
var producer: Producer[Partition, Array[Byte]] = null
info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format (clientId, checkpointTopic, systemName))
def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
retryBackoff.run(
loop => {
if (producer == null) {
producer = connectProducer()
}
producer.send(new KeyedMessage(checkpointTopic, null, partition, serde.toBytes(checkpoint)))
loop.done
},
(exception, loop) => {
warn("Failed to send checkpoint %s for partition %s: %s. Retrying." format (checkpoint, partition, exception))
debug("Exception detail:", exception)
if (producer != null) {
producer.close
}
producer = null
}
)
}
def readLastCheckpoint(partition: Partition): Checkpoint = {
info("Reading checkpoint for partition %s." format partition.getPartitionId)
val checkpoint = retryBackoff.run(
loop => {
// Assume checkpoint topic exists with correct partitions, since it should be verified on start.
// Fetch the metadata for this checkpoint topic/partition pair.
val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
val metadata = metadataMap(checkpointTopic)
val partitionMetadata = metadata.partitionsMetadata
.filter(_.partitionId == partition.getPartitionId)
.headOption
.getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
val partitionId = partitionMetadata.partitionId
val leader = partitionMetadata
.leader
.getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, checkpointTopic, partitionId))
val consumer = new SimpleConsumer(
leader.host,
leader.port,
socketTimeout,
bufferSize,
clientId)
try {
val topicAndPartition = new TopicAndPartition(checkpointTopic, partitionId)
val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
.partitionErrorAndOffsets
.get(topicAndPartition)
.getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (checkpointTopic, partitionId)))
// Fail or retry if there was an an issue with the offset request.
ErrorMapping.maybeThrowException(offsetResponse.error)
val offset = offsetResponse
.offsets
.headOption
.getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (checkpointTopic, partitionId)))
info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, checkpointTopic, partitionId))
if (offset <= 0) {
info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (checkpointTopic, partition))
return null
}
val request = new FetchRequestBuilder()
// Kafka returns 1 greater than the offset of the last message in
// the topic, so subtract one to fetch the last message.
.addFetch(checkpointTopic, partitionId, offset - 1, fetchSize)
.maxWait(500)
.minBytes(1)
.clientId(clientId)
.build
val messageSet = consumer.fetch(request)
if (messageSet.hasError) {
warn("Got error code from broker for %s: %s" format (checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId)))
val errorCode = messageSet.errorCode(checkpointTopic, partitionId)
if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (checkpointTopic, partitionId))
return null
}
ErrorMapping.maybeThrowException(errorCode)
}
val messages = messageSet.messageSet(checkpointTopic, partitionId).toList
if (messages.length != 1) {
throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
+ "messages back when fetching from checkpoint topic %s and partition %s. "
+ "Expected one message. It would be unsafe to go on without the latest checkpoint, "
+ "so failing." format (messages.length, checkpointTopic, partition))
}
// Some back bending to go from message to checkpoint.
val checkpoint = serde.fromBytes(Utils.readBytes(messages(0).message.payload))
loop.done
checkpoint
} finally {
consumer.close
}
},
(exception, loop) => {
exception match {
case e: KafkaCheckpointException => throw e
case e: Exception =>
warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e))
debug("Exception detail:", e)
}
}
).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId))
info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint))
checkpoint
}
def start {
createTopic
validateTopic
}
def register(partition: Partition) {
partitions += partition
}
def stop = {
if(producer != null) {
producer.close
}
}
private def createTopic {
info("Attempting to create checkpoint topic %s with %s partitions." format (checkpointTopic, totalPartitions))
retryBackoff.run(
loop => {
val zkClient = connectZk()
try {
AdminUtils.createTopic(
zkClient,
checkpointTopic,
totalPartitions,
replicationFactor)
} finally {
zkClient.close
}
info("Created checkpoint topic %s." format checkpointTopic)
loop.done
},
(exception, loop) => {
exception match {
case e: TopicExistsException =>
info("Checkpoint topic %s already exists." format checkpointTopic)
loop.done
case e: Exception =>
warn("Failed to create topic %s: %s. Retrying." format (checkpointTopic, e))
debug("Exception detail:", e)
}
}
)
}
private def validateTopic {
info("Validating checkpoint topic %s." format checkpointTopic)
retryBackoff.run(
loop => {
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
val topicMetadata = topicMetadataMap(checkpointTopic)
ErrorMapping.maybeThrowException(topicMetadata.errorCode)
val partitionCount = topicMetadata.partitionsMetadata.length
if (partitionCount != totalPartitions) {
throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, totalPartitions))
}
info("Successfully validated checkpoint topic %s." format checkpointTopic)
loop.done
},
(exception, loop) => {
exception match {
case e: KafkaCheckpointException => throw e
case e: Exception =>
warn("While trying to validate topic %s: %s. Retrying." format (checkpointTopic, e))
debug("Exception detail:", e)
}
}
)
}
override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic)
}
/**
* KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
* one to signal a hard failure, and the other to retry. The
* KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
* CheckpointManager can't recover from.
*/
class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
def this(s: String) = this(s, null)
}