blob: 30a3a78f6b131d1d0e3961a07cb6513fe7df659d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.log.LogConfig
import kafka.message.UncompressedCodec
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
import scala.collection.{Map, Seq, immutable}
case class GroupConfig(groupMinSessionTimeoutMs: Int,
groupMaxSessionTimeoutMs: Int)
case class JoinGroupResult(members: Map[String, Array[Byte]],
memberId: String,
generationId: Int,
subProtocol: String,
leaderId: String,
errorCode: Short)
/**
* GroupCoordinator handles general group membership and offset management.
*
* Each Kafka server instantiates a coordinator which is responsible for a set of
* groups. Groups are assigned to coordinators based on their group names.
*/
class GroupCoordinator(val brokerId: Int,
val groupConfig: GroupConfig,
val offsetConfig: OffsetConfig,
val groupManager: GroupMetadataManager,
time: Time) extends Logging {
type JoinCallback = JoinGroupResult => Unit
type SyncCallback = (Array[Byte], Short) => Unit
this.logIdent = "[GroupCoordinator " + brokerId + "]: "
private val isActive = new AtomicBoolean(false)
private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
props
}
/**
* NOTE: If a group lock and metadataLock are simultaneously needed,
* be sure to acquire the group lock before metadataLock to prevent deadlock
*/
/**
* Startup logic executed at the same time when the server starts up.
*/
def startup() {
info("Starting up.")
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
isActive.set(true)
info("Startup complete.")
}
/**
* Shutdown logic executed at the same time when server shuts down.
* Ordering of actions should be reversed from the startup process.
*/
def shutdown() {
info("Shutting down.")
isActive.set(false)
groupManager.shutdown()
heartbeatPurgatory.shutdown()
joinPurgatory.shutdown()
info("Shutdown complete.")
}
def handleJoinGroup(groupId: String,
memberId: String,
clientId: String,
clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
if (!isActive.get) {
responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
} else if (!validGroupId(groupId)) {
responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
} else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
} else {
// only try to create the group if the group is not unknown AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request
var group = groupManager.getGroup(groupId)
if (group == null) {
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
} else {
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
}
}
private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group synchronized {
if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
// if the member trying to register with a un-recognized id, send the response to let
// it reset its member id and retry
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
group.currentState match {
case Dead =>
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id,
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
case AwaitingSync =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
responseCallback(JoinGroupResult(
members = if (memberId == group.leaderId) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocol,
leaderId = group.leaderId,
errorCode = Errors.NONE.code))
} else {
// member has changed metadata, so force a rebalance
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
}
case Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocol,
leaderId = group.leaderId,
errorCode = Errors.NONE.code))
}
}
}
if (group.is(PreparingRebalance))
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
if (!isActive.get) {
responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else {
val group = groupManager.getGroup(groupId)
if (group == null)
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
else
doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
}
}
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
var delayedGroupStore: Option[DelayedStore] = None
group synchronized {
if (!group.has(memberId)) {
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
} else {
group.currentState match {
case Dead =>
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
case PreparingRebalance =>
responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
case AwaitingSync =>
group.get(memberId).awaitingSyncCallback = responseCallback
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
// if this is the leader, then we can attempt to persist state and transition to stable
if (memberId == group.leaderId) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (errorCode: Short) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the AwaitingSync state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (errorCode != Errors.NONE.code) {
resetAndPropagateAssignmentError(group, errorCode)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}
}))
}
case Stable =>
// if the group is stable, we just return the current assignment
val memberMetadata = group.get(memberId)
responseCallback(memberMetadata.assignment, Errors.NONE.code)
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
}
}
}
// store the group metadata without holding the group lock to avoid the potential
// for deadlock when the callback is invoked
delayedGroupStore.foreach(groupManager.store)
}
def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
if (!isActive.get) {
responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
} else {
val group = groupManager.getGroup(groupId)
if (group == null) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the consumer to retry
// joining without specified consumer id,
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else {
group synchronized {
if (group.is(Dead)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (!group.has(consumerId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else {
val member = group.get(consumerId)
removeHeartbeatForLeavingMember(group, member)
onMemberFailure(group, member)
responseCallback(Errors.NONE.code)
}
}
}
}
}
def handleHeartbeat(groupId: String,
memberId: String,
generationId: Int,
responseCallback: Short => Unit) {
if (!isActive.get) {
responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {
// the group is still loading, so respond just blindly
responseCallback(Errors.NONE.code)
} else {
val group = groupManager.getGroup(groupId)
if (group == null) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else {
group synchronized {
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id,
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (!group.is(Stable)) {
responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
} else if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE.code)
}
}
}
}
}
def handleCommitOffsets(groupId: String,
memberId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
var delayedOffsetStore: Option[DelayedStore] = None
if (!isActive.get) {
responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
} else {
val group = groupManager.getGroup(groupId)
if (group == null) {
if (generationId < 0)
// the group is not relying on Kafka for partition management, so allow the commit
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata,
responseCallback))
else
// the group has failed over to this coordinator (which will be handled in KAFKA-2017),
// or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else {
group synchronized {
if (group.is(Dead)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
} else if (group.is(AwaitingSync)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
} else if (!group.has(memberId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else {
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
offsetMetadata, responseCallback))
}
}
}
}
// store the offsets without holding the group lock
delayedOffsetStore.foreach(groupManager.store)
}
def handleFetchOffsets(groupId: String,
partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (!isActive.get) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
} else if (isCoordinatorLoadingInProgress(groupId)) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
} else {
// return offsets blindly regardless the current group state since the group may be using
// Kafka commit storage without automatic group management
groupManager.getOffsets(groupId, partitions)
}
}
def handleListGroups(): (Errors, List[GroupOverview]) = {
if (!isActive.get) {
(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
} else {
val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
(errorCode, groupManager.currentGroups.map(_.overview).toList)
}
}
def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
if (!isActive.get) {
(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
} else if (!isCoordinatorForGroup(groupId)) {
(Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
} else if (isCoordinatorLoadingInProgress(groupId)) {
(Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
} else {
val group = groupManager.getGroup(groupId)
if (group == null) {
(Errors.NONE, GroupCoordinator.DeadGroup)
} else {
group synchronized {
(Errors.NONE, group.summary)
}
}
}
}
private def onGroupUnloaded(group: GroupMetadata) {
group synchronized {
info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
val previousState = group.currentState
group.transitionTo(Dead)
previousState match {
case Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
if (member.awaitingJoinCallback != null) {
member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
member.awaitingJoinCallback = null
}
}
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
case Stable | AwaitingSync =>
for (member <- group.allMemberMetadata) {
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
member.awaitingSyncCallback = null
}
heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
}
}
}
}
private def onGroupLoaded(group: GroupMetadata) {
group synchronized {
info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
assert(group.is(Stable))
group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
}
}
def handleGroupImmigration(offsetTopicPartitionId: Int) {
groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
}
def handleGroupEmigration(offsetTopicPartitionId: Int) {
groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
}
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
assert(group.is(AwaitingSync))
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
propagateAssignment(group, Errors.NONE.code)
}
private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
assert(group.is(AwaitingSync))
group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
propagateAssignment(group, errorCode)
}
private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
for (member <- group.allMemberMetadata) {
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(member.assignment, errorCode)
member.awaitingSyncCallback = null
// reset the session timeout for members after propagating the member's assignment.
// This is because if any member's session expired while we were still awaiting either
// the leader sync group or the storage callback, its expiration will be ignored and no
// future heartbeat expectations will not be scheduled.
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
private def validGroupId(groupId: String): Boolean = {
groupId != null && !groupId.isEmpty
}
private def joinError(memberId: String, errorCode: Short): JoinGroupResult = {
JoinGroupResult(
members=Map.empty,
memberId=memberId,
generationId=0,
subProtocol=GroupCoordinator.NoProtocol,
leaderId=GroupCoordinator.NoLeader,
errorCode=errorCode)
}
/**
* Complete existing DelayedHeartbeats for the given member and schedule the next one
*/
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
// complete current heartbeat expectation
member.latestHeartbeat = time.milliseconds()
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
// reschedule the next heartbeat expiration deadline
val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
member.isLeaving = true
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
}
private def addMemberAndRebalance(sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
member.awaitingJoinCallback = callback
group.add(member.memberId, member)
maybePrepareRebalance(group)
member
}
private def updateMemberAndRebalance(group: GroupMetadata,
member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) {
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
maybePrepareRebalance(group)
}
private def maybePrepareRebalance(group: GroupMetadata) {
group synchronized {
if (group.canRebalance)
prepareRebalance(group)
}
}
private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
val rebalanceTimeout = group.rebalanceTimeout
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
group.remove(member.memberId)
group.currentState match {
case Dead =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group synchronized {
if (group.notYetRejoinedMembers.isEmpty)
forceComplete()
else false
}
}
def onExpireJoin() {
// TODO: add metrics for restabilize timeouts
}
def onCompleteJoin(group: GroupMetadata) {
group synchronized {
val failedMembers = group.notYetRejoinedMembers
if (group.isEmpty || !failedMembers.isEmpty) {
failedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}
// TODO KAFKA-2720: only remove group in the background thread
if (group.isEmpty) {
group.transitionTo(Dead)
groupManager.removeGroup(group)
info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
}
}
if (!group.is(Dead)) {
group.initNextGeneration()
info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
// trigger the awaiting join group response callback for all the members after rebalancing
for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
memberId=member.memberId,
generationId=group.generationId,
subProtocol=group.protocol,
leaderId=group.leaderId,
errorCode=Errors.NONE.code)
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
}
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group synchronized {
if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
forceComplete()
else false
}
}
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
group synchronized {
if (!shouldKeepMemberAlive(member, heartbeatDeadline))
onMemberFailure(group, member)
}
}
def onCompleteHeartbeat() {
// TODO: add metrics for complete heartbeats
}
def partitionFor(group: String): Int = groupManager.partitionFor(group)
private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
member.awaitingJoinCallback != null ||
member.awaitingSyncCallback != null ||
member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
private def isCoordinatorLoadingInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
}
object GroupCoordinator {
val NoState = ""
val NoProtocolType = ""
val NoProtocol = ""
val NoLeader = ""
val NoMembers = List[MemberSummary]()
val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
def apply(config: KafkaConfig,
zkUtils: ZkUtils,
replicaManager: ReplicaManager,
time: Time): GroupCoordinator = {
val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time)
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, time)
}
}