blob: 43f957711f44842994caaafd61f6207802432e08 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.log.LogManager
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
import kafka.network.{SocketServerStats, SocketServer}
import java.io.File
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig) extends Logging {
val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
private var logManager: LogManager = null
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
info("Starting Kafka server...")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
if (cleanShutDownFile.exists) {
needRecovery = false
cleanShutDownFile.delete
}
logManager = new LogManager(config,
scheduler,
SystemTime,
1000L * 60 * 60 * config.logRollHours,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
val handlers = new KafkaRequestHandlers(logManager)
socketServer = new SocketServer(config.port,
config.numThreads,
config.monitoringPeriodSecs,
handlers.handlerFor,
config.socketSendBuffer,
config.socketReceiveBuffer,
config.maxSocketRequestSize)
Utils.registerMBean(socketServer.stats, statsMBeanName)
socketServer.startup()
Mx4jLoader.maybeLoad
/**
* Registers this broker in ZK. After this, consumers can connect to broker.
* So this should happen after socket server start.
*/
logManager.startup()
info("Kafka server started.")
}
/**
* Shutdown API for shutting down a single instance of the Kafka server.
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("Shutting down Kafka server")
scheduler.shutdown()
if (socketServer != null)
socketServer.shutdown()
Utils.unregisterMBean(statsMBeanName)
if (logManager != null)
logManager.close()
val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
cleanShutDownFile.createNewFile
shutdownLatch.countDown()
info("Kafka server shut down completed")
}
}
/**
* After calling shutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
def getStats(): SocketServerStats = socketServer.stats
}