blob: 7f67eb3d808a11c023c3f148ab8c55b6ca1a44ff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log4j
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.{PropertyConfigurator, Logger}
import java.util.Properties
import java.io.File
import kafka.consumer.SimpleConsumer
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging}
import kafka.zk.EmbeddedZookeeper
import junit.framework.Assert._
import kafka.api.FetchRequest
import kafka.serializer.Encoder
import kafka.message.Message
import kafka.producer.async.MissingConfigException
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
var logDirZk: File = null
var logDirBl: File = null
// var topicLogDir: File = null
var serverBl: KafkaServer = null
var serverZk: KafkaServer = null
var simpleConsumerZk: SimpleConsumer = null
var simpleConsumerBl: SimpleConsumer = null
val tLogger = Logger.getLogger(getClass())
private val brokerZk = 0
private val brokerBl = 1
private val ports = TestUtils.choosePorts(2)
private val (portZk, portBl) = (ports(0), ports(1))
private var zkServer:EmbeddedZookeeper = null
@Before
def setUp() {
zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
val propsBl: Properties = createBrokerConfig(brokerBl, portBl)
val logDirBlPath = propsBl.getProperty("log.dir")
logDirBl = new File(logDirBlPath)
serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
Thread.sleep(100)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024)
}
@After
def tearDown() {
simpleConsumerZk.close
simpleConsumerBl.close
serverZk.shutdown
serverBl.shutdown
Utils.rm(logDirZk)
Utils.rm(logDirBl)
Thread.sleep(500)
zkServer.shutdown
Thread.sleep(500)
}
@Test
def testKafkaLog4jConfigs() {
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// port missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// host missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// topic missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// serializer missing
try {
PropertyConfigurator.configure(props)
}catch {
case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
}
}
@Test
def testBrokerListLog4jAppends() {
PropertyConfigurator.configure(getLog4jConfigWithBrokerList)
for(i <- 1 to 5)
info("test")
Thread.sleep(500)
var offset = 0L
val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
var count = 0
for(message <- messages) {
count = count + 1
offset += message.offset
}
assertEquals(5, count)
}
@Test
def testZkConnectLog4jAppends() {
PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
for(i <- 1 to 5)
info("test")
Thread.sleep(500)
var offset = 0L
val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
var count = 0
for(message <- messages) {
count = count + 1
offset += message.offset
}
assertEquals(5, count)
}
private def getLog4jConfigWithBrokerList: Properties = {
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props
}
private def getLog4jConfigWithZkConnect: Properties = {
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props
}
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
props.put("port", port.toString)
props.put("log.dir", getLogDir.getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("enable.zookeeper", "false")
props.put("num.partitions", "1")
props.put("log.retention.hours", "10")
props.put("log.cleanup.interval.mins", "5")
props.put("log.file.size", "1000")
props
}
private def getLogDir(): File = {
val dir = TestUtils.tempDir()
dir
}
}
class AppenderStringEncoder extends Encoder[LoggingEvent] {
def toMessage(event: LoggingEvent):Message = {
val logMessage = event.getMessage
new Message(logMessage.asInstanceOf[String].getBytes)
}
}