blob: 53e920c910e6799098364647c4abf7e73467188f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer
import async.AsyncProducer
import java.util.Properties
import org.apache.log4j.{Logger, Level}
import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
import junit.framework.Assert
import org.easymock.EasyMock
import java.util.concurrent.ConcurrentHashMap
import kafka.cluster.Partition
import org.scalatest.junit.JUnitSuite
import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException}
import kafka.utils.{TestUtils, TestZKUtils, Utils}
import kafka.serializer.{StringEncoder, Encoder}
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
class ProducerTest extends JUnitSuite {
private val topic = "test-topic"
private val brokerId1 = 0
private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
private var server2: KafkaServer = null
private var producer1: SyncProducer = null
private var producer2: SyncProducer = null
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var zkServer:EmbeddedZookeeper = null
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
@Before
def setUp() {
// set up 2 brokers with 4 partitions each
zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
val config1 = new KafkaConfig(props1) {
override val numPartitions = 4
}
server1 = TestUtils.createServer(config1)
val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
val config2 = new KafkaConfig(props2) {
override val numPartitions = 4
}
server2 = TestUtils.createServer(config2)
val props = new Properties()
props.put("host", "localhost")
props.put("port", port1.toString)
producer1 = new SyncProducer(new SyncProducerConfig(props))
producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test".getBytes())))
producer2 = new SyncProducer(new SyncProducerConfig(props) {
override val port = port2
})
producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test".getBytes())))
consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
Thread.sleep(500)
}
@After
def tearDown() {
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
server1.shutdown
server2.shutdown
Utils.rm(server1.config.logDir)
Utils.rm(server2.config.logDir)
Thread.sleep(500)
zkServer.shutdown
Thread.sleep(500)
}
@Test
def testSend() {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val partitioner = new StaticPartitioner
val serializer = new StringSerializer
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
// it should send to partition 0 (first partition) on second broker i.e broker2
syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
syncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
EasyMock.replay(syncProducer2)
syncProducers.put(brokerId1, syncProducer1)
syncProducers.put(brokerId2, syncProducer2)
val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
producer.close
EasyMock.verify(syncProducer1)
EasyMock.verify(syncProducer2)
}
@Test
def testSendSingleMessage() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("broker.list", "0:localhost:9092")
val config = new ProducerConfig(props)
val partitioner = new StaticPartitioner
val serializer = new StringSerializer
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
// it should send to a random partition due to use of broker.list
syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
syncProducers.put(brokerId1, syncProducer1)
val producerPool = new ProducerPool[String](config, serializer, syncProducers,
new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
producer.send(new ProducerData[String, String](topic, "t"))
producer.close
EasyMock.verify(syncProducer1)
}
@Test
def testInvalidPartition() {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val richProducer = new Producer[String, String](config)
try {
richProducer.send(new ProducerData[String, String](topic, "test", Array("test")))
Assert.fail("Should fail with InvalidPartitionException")
}catch {
case e: InvalidPartitionException => // expected, do nothing
}finally {
richProducer.close()
}
}
@Test
def testDefaultEncoder() {
val props = new Properties()
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
try {
stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test")))
fail("Should fail with ClassCastException due to incompatible Encoder")
} catch {
case e: ClassCastException =>
}finally {
stringProducer1.close()
}
props.put("serializer.class", "kafka.serializer.StringEncoder")
val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
stringProducer2.close()
val messageProducer1 = new Producer[String, Message](config)
try {
messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
} catch {
case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
}finally {
messageProducer1.close()
}
}
@Test
def testSyncProducerPool() {
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
syncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
EasyMock.replay(syncProducer2)
syncProducers.put(brokerId1, syncProducer1)
syncProducers.put(brokerId2, syncProducer2)
// default for producer.type is "sync"
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(syncProducer1)
EasyMock.verify(syncProducer2)
}
@Test
def testAsyncProducerPool() {
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
asyncProducer1.send(topic, "test1", 0)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
asyncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(asyncProducer1)
EasyMock.replay(asyncProducer2)
asyncProducers.put(brokerId1, asyncProducer1)
asyncProducers.put(brokerId2, asyncProducer2)
// change producer.type to "async"
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(asyncProducer1)
EasyMock.verify(asyncProducer2)
}
@Test
def testSyncUnavailableProducerException() {
val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
syncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
EasyMock.replay(syncProducer2)
syncProducers.put(brokerId2, syncProducer2)
// default for producer.type is "sync"
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
try {
producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
}
producerPool.close
EasyMock.verify(syncProducer1)
EasyMock.verify(syncProducer2)
}
@Test
def testAsyncUnavailableProducerException() {
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
asyncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(asyncProducer1)
EasyMock.replay(asyncProducer2)
asyncProducers.put(brokerId2, asyncProducer2)
// change producer.type to "async"
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
try {
producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
}
producerPool.close
EasyMock.verify(asyncProducer1)
EasyMock.verify(asyncProducer2)
}
@Test
def testConfigBrokerPartitionInfoWithPartitioner {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
var config: ProducerConfig = null
try {
config = new ProducerConfig(props)
fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
}catch {
case e: InvalidConfigException => // expected
}
}
@Test
def testConfigBrokerPartitionInfo() {
val props = new Properties()
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
val config = new ProducerConfig(props)
val partitioner = new StaticPartitioner
val serializer = new StringSerializer
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to a random partition due to use of broker.list
asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
EasyMock.replay(asyncProducer1)
asyncProducers.put(brokerId1, asyncProducer1)
val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
producer.close
EasyMock.verify(asyncProducer1)
}
@Test
def testZKSendToNewTopic() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val serializer = new StringEncoder
val producer = new Producer[String, String](config)
try {
// Available broker id, partition id at this stage should be (0,0), (1,0)
// this should send the message to broker 0 on partition 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
// Since 4 % 5 = 4, this should send the message to broker 1 on partition 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
}finally {
producer.close
}
}
@Test
def testZKSendWithDeadBroker() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val serializer = new StringEncoder
val producer = new Producer[String, String](config)
try {
// Available broker id, partition id at this stage should be (0,0), (1,0)
// Hence, this should send the message to broker 0 on partition 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// kill 2nd broker
server2.shutdown
Thread.sleep(100)
// Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
// Since 4 % 5 = 4, in a normal case, it would send to broker 1 on partition 0. But since broker 1 is down,
// 4 % 4 = 0, So it should send the message to broker 0 on partition 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
} catch {
case e: Exception => fail("Not expected")
}finally {
producer.close
}
}
@Test
def testZKSendToExistingTopicWithNoBrokers() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val serializer = new StringEncoder
val producer = new Producer[String, String](config)
var server: KafkaServer = null
try {
// shutdown server1
server1.shutdown
Thread.sleep(100)
// Available broker id, partition id at this stage should be (1,0)
// this should send the message to broker 1 on partition 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
// shutdown server2
server2.shutdown
Thread.sleep(100)
// delete the new-topic logs
Utils.rm(server2.config.logDir)
Thread.sleep(100)
// start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
val props2 = TestUtils.createBrokerConfig(brokerId1, port1)
val config2 = new KafkaConfig(props2) {
override val numPartitions = 4
}
server = TestUtils.createServer(config2)
Thread.sleep(100)
// now there are no brokers registered under test-topic.
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
}finally {
server.shutdown
producer.close
}
}
@Test
def testPartitionedSendToNewTopic() {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val partitioner = new StaticPartitioner
val serializer = new StringSerializer
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test1".getBytes)))
EasyMock.expectLastCall
syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test1".getBytes)))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
syncProducer2.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
EasyMock.replay(syncProducer2)
syncProducers.put(brokerId1, syncProducer1)
syncProducers.put(brokerId2, syncProducer2)
val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
Thread.sleep(100)
// now send again to this topic using a real producer, this time all brokers would have registered
// their partitions in zookeeper
producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test".getBytes())))
Thread.sleep(100)
// wait for zookeeper to register the new topic
producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1")))
Thread.sleep(100)
producer.close
EasyMock.verify(syncProducer1)
EasyMock.verify(syncProducer2)
}
@Test
def testPartitionedSendToNewBrokerInExistingTopic() {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.StaticPartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val partitioner = new StaticPartitioner
val serializer = new StringSerializer
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
val syncProducer3 = EasyMock.createMock(classOf[SyncProducer])
syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test1".getBytes)))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
syncProducer2.close
EasyMock.expectLastCall
syncProducer3.close
EasyMock.expectLastCall
EasyMock.replay(syncProducer1)
EasyMock.replay(syncProducer2)
EasyMock.replay(syncProducer3)
syncProducers.put(brokerId1, syncProducer1)
syncProducers.put(brokerId2, syncProducer2)
syncProducers.put(2, syncProducer3)
val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
val port = TestUtils.choosePort
val serverProps = TestUtils.createBrokerConfig(2, port)
val serverConfig = new KafkaConfig(serverProps) {
override val numPartitions = 4
}
val server3 = TestUtils.createServer(serverConfig)
Thread.sleep(500)
// send a message to the new broker to register it under topic "test-topic"
val tempProps = new Properties()
tempProps.put("host", "localhost")
tempProps.put("port", port.toString)
val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message("test".getBytes())))
Thread.sleep(500)
producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
producer.close
EasyMock.verify(syncProducer1)
EasyMock.verify(syncProducer2)
EasyMock.verify(syncProducer3)
server3.shutdown
Utils.rm(server3.config.logDir)
}
@Test
def testDefaultPartitioner() {
val props = new Properties()
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
val config = new ProducerConfig(props)
val partitioner = new DefaultPartitioner[String]
val serializer = new StringSerializer
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to a random partition due to use of broker.list
asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
EasyMock.replay(asyncProducer1)
asyncProducers.put(brokerId1, asyncProducer1)
val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
producer.close
EasyMock.verify(asyncProducer1)
}
}
class StringSerializer extends Encoder[String] {
def toEvent(message: Message):String = message.toString
def toMessage(event: String):Message = new Message(event.getBytes)
def getTopic(event: String): String = event.concat("-topic")
}
class NegativePartitioner extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
-1
}
}
class StaticPartitioner extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.length % numPartitions)
}
}
class HashPartitioner extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.hashCode % numPartitions)
}
}