blob: 61492768b600682dfbb6c883c6616a0301233b7e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import java.util
import kafka.utils.nonthreadsafe
import scala.collection.Map
case class MemberSummary(memberId: String,
clientId: String,
clientHost: String,
metadata: Array[Byte],
assignment: Array[Byte])
/**
* Member metadata contains the following metadata:
*
* Heartbeat metadata:
* 1. negotiated heartbeat session timeout
* 2. timestamp of the latest heartbeat
*
* Protocol metadata:
* 1. the list of supported protocols (ordered by preference)
* 2. the metadata associated with each protocol
*
* In addition, it also contains the following state information:
*
* 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
* its rebalance callback will be kept in the metadata if the
* member has sent the join group request
* 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
* is kept in metadata until the leader provides the group assignment
* and the group transitions to stable
*/
@nonthreadsafe
private[coordinator] class MemberMetadata(val memberId: String,
val groupId: String,
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {
var assignment: Array[Byte] = Array.empty[Byte]
var awaitingJoinCallback: JoinGroupResult => Unit = null
var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
var latestHeartbeat: Long = -1
var isLeaving: Boolean = false
def protocols = supportedProtocols.map(_._1).toSet
/**
* Get metadata corresponding to the provided protocol.
*/
def metadata(protocol: String): Array[Byte] = {
supportedProtocols.find(_._1 == protocol) match {
case Some((_, metadata)) => metadata
case None =>
throw new IllegalArgumentException("Member does not support protocol")
}
}
/**
* Check if the provided protocol metadata matches the currently stored metadata.
*/
def matches(protocols: List[(String, Array[Byte])]): Boolean = {
if (protocols.size != this.supportedProtocols.size)
return false
for (i <- 0 until protocols.size) {
val p1 = protocols(i)
val p2 = supportedProtocols(i)
if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
return false
}
true
}
def summary(protocol: String): MemberSummary = {
MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
}
def summaryNoMetadata(): MemberSummary = {
MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
}
/**
* Vote for one of the potential group protocols. This takes into account the protocol preference as
* indicated by the order of supported protocols and returns the first one also contained in the set
*/
def vote(candidates: Set[String]): String = {
supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
case Some((protocol, _)) => protocol
case None =>
throw new IllegalArgumentException("Member does not support any of the candidate protocols")
}
}
override def toString = {
"[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
}
}