blob: c773382683a83142cf22c689cee510b4a756c8b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io.{File, IOException}
import java.net.SocketTimeoutException
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.KAFKA_0_9_0
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
import kafka.controller.{ControllerStats, KafkaController}
import kafka.coordinator.GroupCoordinator
import kafka.log.{CleanerConfig, LogConfig, LogManager}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
import kafka.network.{BlockingChannel, SocketServer}
import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{ClusterResource, Node}
import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
object KafkaServer {
// Copy the subset of properties that are relevant to Logs
// I'm listing out individual properties here since the names are slightly different in each Config class...
private[kafka] def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = {
val logProps = new util.HashMap[String, Object]()
logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes)
logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis)
logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis)
logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes)
logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages)
logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs)
logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes)
logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long)
logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas)
logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs)
logProps
}
private[server] def metricConfig(kafkaConfig: KafkaConfig): MetricConfig = {
new MetricConfig()
.samples(kafkaConfig.metricNumSamples)
.recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel))
.timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS)
}
}
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private val jmxPrefix: String = "kafka.server"
var metrics: Metrics = null
val brokerState: BrokerState = new BrokerState
var apis: KafkaApis = null
var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
var replicaManager: ReplicaManager = null
var adminManager: AdminManager = null
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
var groupCoordinator: GroupCoordinator = null
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var kafkaHealthcheck: KafkaHealthcheck = null
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
var zkUtils: ZkUtils = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
def clusterId: String = _clusterId
newGauge(
"BrokerState",
new Gauge[Int] {
def value = brokerState.currentState
}
)
newGauge(
"ClusterId",
new Gauge[String] {
def value = clusterId
}
)
newGauge(
"yammer-metrics-count",
new Gauge[Int] {
def value = {
com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
}
}
)
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.newState(Starting)
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkUtils = initZk()
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
/* create and configure metrics */
val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
clusterId, time)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
endpoint
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
val clusterResourceListeners = new ClusterResourceListeners
clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
}
private def initZk(): ZkUtils = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
val chrootIndex = config.zkConnect.indexOf("/")
val chrootOption = {
if (chrootIndex > 0) Some(config.zkConnect.substring(chrootIndex))
else None
}
val secureAclsEnabled = config.zkEnableSecureAcls
val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled()
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the verification of the JAAS login file failed.")
chrootOption.foreach { chroot =>
val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
sessionTimeout = config.zkSessionTimeoutMs,
connectionTimeout = config.zkConnectionTimeoutMs,
secureAclsEnabled)
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
info(s"Created zookeeper path $chroot")
zkClientForChrootCreation.zkClient.close()
}
val zkUtils = ZkUtils(config.zkConnect,
sessionTimeout = config.zkSessionTimeoutMs,
connectionTimeout = config.zkConnectionTimeoutMs,
secureAclsEnabled)
zkUtils.setupCommonPaths()
zkUtils
}
def getOrGenerateClusterId(zkUtils: ZkUtils): String = {
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
/**
* Performs controlled shutdown
*/
private def controlledShutdown() {
def node(broker: Broker): Node = {
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
}
val socketTimeoutMs = config.controllerSocketTimeoutMs
def networkClientControlledShutdown(retries: Int): Boolean = {
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"kafka-server-controlled-shutdown",
Map.empty.asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
metadataUpdater,
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time,
false)
}
var shutdownSucceeded: Boolean = false
try {
var remainingRetries = retries
var prevController: Broker = null
var ioException = false
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
import NetworkClientBlockingOps._
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
val controllerId = zkUtils.getController()
zkUtils.getBrokerInfo(controllerId) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
if (ioException || broker != prevController) {
ioException = false
if (prevController != null)
networkClient.close(node(prevController).idString)
prevController = broker
metadataUpdater.setNodes(Seq(node(prevController)).asJava)
}
case None => //ignore and try again
}
// 2. issue a controlled shutdown to the controller
if (prevController != null) {
try {
if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
time.milliseconds(), true)
val clientResponse = networkClient.blockingSendAndReceive(request)(time)
val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {
shutdownSucceeded = true
info("Controlled shutdown succeeded")
}
else {
info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(",")))
info("Error code from controller: %d".format(shutdownResponse.errorCode))
}
}
catch {
case ioe: IOException =>
ioException = true
warn("Error during controlled shutdown, possibly because leader movement took longer than the configured socket.timeout.ms: %s".format(ioe.getMessage))
// ignore and try again
}
}
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally
networkClient.close()
shutdownSucceeded
}
def blockingChannelControlledShutdown(retries: Int): Boolean = {
var remainingRetries = retries
var channel: BlockingChannel = null
var prevController: Broker = null
var shutdownSucceeded: Boolean = false
try {
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
val controllerId = zkUtils.getController()
zkUtils.getBrokerInfo(controllerId) match {
case Some(broker) =>
if (channel == null || prevController == null || !prevController.equals(broker)) {
// if this is the first attempt or if the controller has changed, create a channel to the most recent
// controller
if (channel != null)
channel.disconnect()
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
channel = new BlockingChannel(brokerEndPoint.host,
brokerEndPoint.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
prevController = broker
}
case None => //ignore and try again
}
// 2. issue a controlled shutdown to the controller
if (channel != null) {
var response: NetworkReceive = null
try {
// send the controlled shutdown request
val request = new kafka.api.ControlledShutdownRequest(0, correlationId.getAndIncrement, None, config.brokerId)
channel.send(request)
response = channel.receive()
val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.isEmpty) {
shutdownSucceeded = true
info ("Controlled shutdown succeeded")
}
else {
info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
info("Error code from controller: %d".format(shutdownResponse.errorCode))
}
}
catch {
case ioe: java.io.IOException =>
channel.disconnect()
channel = null
warn("Error during controlled shutdown, possibly because leader movement took longer than the configured socket.timeout.ms: %s".format(ioe.getMessage))
// ignore and try again
}
}
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally {
if (channel != null) {
channel.disconnect()
channel = null
}
}
shutdownSucceeded
}
if (startupComplete.get() && config.controlledShutdownEnable) {
// We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
// of time and try again for a configured number of retries. If all the attempt fails, we simply force
// the shutdown.
info("Starting controlled shutdown")
brokerState.newState(PendingControlledShutdown)
val shutdownSucceeded =
// Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
// `RequestHeader`, which is used by `NetworkClient`
if (config.interBrokerProtocolVersion >= KAFKA_0_9_0)
networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}
/**
* Shutdown API for shutting down a single instance of the Kafka server.
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
try {
info("shutting down")
if(isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
// To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
if(socketServer != null)
CoreUtils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
if(apis != null)
CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
if(replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown())
if(groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown())
if(logManager != null)
CoreUtils.swallow(logManager.shutdown())
if(kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown())
if(zkUtils != null)
CoreUtils.swallow(zkUtils.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
brokerState.newState(NotRunning)
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
shutdownLatch.countDown()
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}
/**
* After calling shutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
topic -> LogConfig.fromProps(defaultProps, configs)
}
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)
}
/**
* Generates new brokerId if enabled or reads from meta.properties based on following conditions
* <ol>
* <li> config has no broker.id provided and broker id generation is enabled, generates a broker.id based on Zookeeper's sequence
* <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
* <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
* <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
* <ol>
*
* @return A brokerId.
*/
private def getBrokerId: Int = {
var brokerId = config.brokerId
val brokerIdSet = mutable.HashSet[Int]()
for (logDir <- config.logDirs) {
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
brokerMetadataOpt.foreach { brokerMetadata =>
brokerIdSet.add(brokerMetadata.brokerId)
}
}
if(brokerIdSet.size > 1)
throw new InconsistentBrokerIdException(
s"Failed to match broker.id across log.dirs. This could happen if multiple brokers shared a log directory (log.dirs) " +
s"or partial data was manually copied from another broker. Found $brokerIdSet")
else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
throw new InconsistentBrokerIdException(
s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last} in meta.properties. " +
s"If you moved your data, make sure your configured broker.id matches. " +
s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
else if(brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
brokerId = generateBrokerId
else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
brokerId = brokerIdSet.last
brokerId
}
private def checkpointBrokerId(brokerId: Int) {
var logDirsWithoutMetaProps: List[String] = List()
for (logDir <- config.logDirs) {
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
if(brokerMetadataOpt.isEmpty)
logDirsWithoutMetaProps ++= List(logDir)
}
for(logDir <- logDirsWithoutMetaProps) {
val checkpoint = brokerMetadataCheckpoints(logDir)
checkpoint.write(BrokerMetadata(brokerId))
}
}
private def generateBrokerId: Int = {
try {
zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
throw new GenerateBrokerIdException("Failed to generate broker.id", e)
}
}
}