| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system.kafka |
| |
| import org.apache.kafka.clients.producer._ |
| import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException} |
| import org.apache.kafka.test.MockSerializer |
| import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream} |
| import org.junit.Assert._ |
| import org.junit.Test |
| import org.scalatest.Assertions.intercept |
| |
| |
| class TestKafkaSystemProducer { |
| val systemStream = new SystemStream("testSystem", "testStream") |
| val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes) |
| |
| @Test |
| def testKafkaProducer { |
| val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer) |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics) |
| systemProducer.register("test") |
| systemProducer.start |
| systemProducer.send("test", someMessage) |
| assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size()) |
| systemProducer.stop |
| } |
| |
| @Test |
| def testKafkaProducerUsingMockKafkaProducer { |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics) |
| systemProducer.register("test") |
| systemProducer.start() |
| systemProducer.send("test", someMessage) |
| assertEquals(1, mockProducer.getMsgsSent) |
| systemProducer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerBufferedSend { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producerMetrics = new KafkaSystemProducerMetrics |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = producerMetrics) |
| systemProducer.register("test") |
| systemProducer.start() |
| systemProducer.send("test", msg1) |
| |
| mockProducer.setShouldBuffer(true) |
| systemProducer.send("test", msg2) |
| systemProducer.send("test", msg3) |
| assertEquals(1, mockProducer.getMsgsSent) |
| |
| val sendThread: Thread = mockProducer.startDelayedSendThread(2000) |
| sendThread.join() |
| |
| assertEquals(3, mockProducer.getMsgsSent) |
| systemProducer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerFlushSuccessful { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics) |
| systemProducer.register("test") |
| systemProducer.start() |
| systemProducer.send("test", msg1) |
| |
| mockProducer.setShouldBuffer(true) |
| systemProducer.send("test", msg2) |
| systemProducer.send("test", msg3) |
| assertEquals(1, mockProducer.getMsgsSent) |
| mockProducer.startDelayedSendThread(2000) |
| systemProducer.flush("test") |
| assertEquals(3, mockProducer.getMsgsSent) |
| systemProducer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerFlushWithException { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics()) |
| systemProducer.register("test") |
| systemProducer.start() |
| systemProducer.send("test", msg1) |
| |
| mockProducer.setShouldBuffer(true) |
| systemProducer.send("test", msg2) |
| mockProducer.setErrorNext(true, true, new RecordTooLargeException()) |
| systemProducer.send("test", msg3) |
| systemProducer.send("test", msg4) |
| |
| assertEquals(1, mockProducer.getMsgsSent) |
| |
| mockProducer.startDelayedSendThread(2000) |
| val thrown = intercept[SystemProducerException] { |
| systemProducer.flush("test") |
| } |
| assertTrue(thrown.isInstanceOf[SystemProducerException]) |
| assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent |
| systemProducer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerWithRetriableException { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = producerMetrics) |
| |
| producer.register("test") |
| producer.start() |
| producer.send("test", msg1) |
| producer.send("test", msg2) |
| producer.send("test", msg3) |
| producer.flush("test") |
| |
| mockProducer.setErrorNext(true, true, new TimeoutException()) |
| |
| producer.send("test", msg4) |
| val thrown = intercept[SystemProducerException] { |
| producer.flush("test") |
| } |
| assertTrue(thrown.isInstanceOf[SystemProducerException]) |
| assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException]) |
| assertEquals(3, mockProducer.getMsgsSent) |
| producer.stop() |
| } |
| |
| /** |
| * If there's an exception, we should: |
| * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. |
| * 2. Record the original exception |
| * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails. |
| * |
| * Assumptions: |
| * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit) |
| * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance |
| * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container |
| * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others. |
| * |
| * Conclusions: |
| * It is only safe to handle the async exceptions from by closing the producer and failing the container. |
| * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data |
| * loss by checkpointing a failed offset. |
| * |
| * Inaccuracies: |
| * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all |
| * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary |
| * conditions where the batches align perfectly around the failed send(). |
| */ |
| @Test |
| def testKafkaProducerWithFatalExceptions { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics) |
| producer.register("test") |
| producer.start() |
| |
| producer.send("test", msg1) |
| producer.send("test", msg2) |
| mockProducer.setErrorNext(true, true, new RecordTooLargeException()) |
| producer.send("test", msg3) // Callback exception |
| assertTrue(mockProducer.isClosed) |
| assertNotNull(producer.producerRef.get()) |
| assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) |
| |
| val senderException = intercept[SystemProducerException] { |
| producer.send("test", msg4) // Should fail because the producer is closed. |
| } |
| assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| val callbackException = intercept[SystemProducerException] { |
| producer.flush("test") // Should throw the callback exception |
| } |
| assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| val postFlushException = intercept[SystemProducerException] { |
| producer.send("test", msg5) // Should not be able to send again after flush |
| } |
| assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| val callbackException2 = intercept[SystemProducerException] { |
| producer.flush("test") // Should rethrow the exception |
| } |
| assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent |
| producer.stop() |
| } |
| |
| /** |
| * Recapping from [[testKafkaProducerWithFatalExceptions]]: |
| * |
| * If there's an exception, we should: |
| * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. |
| * 2. Record the original exception |
| * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails. |
| * |
| * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources |
| * which share the same producer. |
| */ |
| @Test |
| def testKafkaProducerWithFatalExceptionsMultipleSources { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes) |
| val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics) |
| producer.register("test1") |
| producer.register("test2") |
| |
| producer.start() |
| |
| // Initial sends |
| producer.send("test1", msg1) |
| producer.send("test2", msg2) |
| |
| // Inject error for next send |
| mockProducer.setErrorNext(true, true, new RecordTooLargeException()) |
| producer.send("test1", msg3) // Callback exception |
| assertTrue(mockProducer.isClosed) |
| assertNotNull(producer.producerRef.get()) |
| assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) |
| |
| // Subsequent sends |
| val senderException = intercept[SystemProducerException] { |
| producer.send("test1", msg4) // Should fail because the producer is closed. |
| } |
| assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| val callbackException = intercept[SystemProducerException] { |
| producer.send("test2", msg4) // First send from separate source gets a producer closed exception |
| } |
| assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| val callbackException2 = intercept[SystemProducerException] { |
| producer.send("test2", msg5) // Second send should still get the error |
| } |
| assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| // Flushes |
| val callbackException3 = intercept[SystemProducerException] { |
| producer.flush("test2") // Should rethrow the closed exception in flush |
| } |
| assertTrue(callbackException3.isInstanceOf[SystemProducerException]) |
| assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| intercept[SystemProducerException] { |
| producer.send("test2", msg6) // Should still not be able to send after flush |
| } |
| |
| val thrown3 = intercept[SystemProducerException] { |
| producer.flush("test1") // Should throw the callback exception |
| } |
| assertTrue(thrown3.isInstanceOf[SystemProducerException]) |
| assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException]) |
| |
| intercept[SystemProducerException] { |
| producer.send("test1", msg7) // Should still not be able to send after flush |
| } |
| |
| intercept[SystemProducerException] { |
| producer.flush("test1") // Should throw the callback exception |
| } |
| assertEquals(2, mockProducer.getMsgsSent) |
| producer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerWithNonFatalExceptionsMultipleSources { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics) |
| producer.register("test1") |
| producer.register("test2") |
| producer.start() |
| |
| producer.send("test1", msg1) |
| producer.send("test2", msg2) |
| mockProducer.setErrorNext(true, false, new SerializationException()) |
| val sendException = intercept[SystemProducerException] { |
| producer.send("test1", msg3) // User-thread exception |
| } |
| assertTrue(sendException.getCause.isInstanceOf[SerializationException]) |
| assertFalse(mockProducer.isClosed) |
| assertNotNull(producer.producerRef.get()) |
| assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) |
| |
| producer.send("test1", msg3) // Should be able to resend msg3 |
| producer.send("test2", msg4) // Second source should not be affected |
| |
| producer.flush("test1") // Flush should be unaffected |
| |
| producer.send("test1", msg5) // Should be able to send again after flush |
| |
| assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent |
| producer.stop() |
| } |
| |
| /** |
| * If there's an exception and the user configured task.drop.producer.errors, we should: |
| * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. |
| * 2. Recreate the producer. |
| * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option) |
| * |
| * Assumptions: |
| * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit) |
| * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance |
| * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container |
| * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others. |
| * |
| * Conclusions: |
| * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and |
| * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric |
| * should be accurate, so users should alert on that. |
| * |
| * Inaccuracies: |
| * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all |
| * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary |
| * conditions where the batches align perfectly around the failed send(). |
| */ |
| @Test |
| def testKafkaProducerWithFatalExceptionsDroppingExceptions { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics, |
| dropProducerExceptions = true) // Here's where we enable exception dropping. |
| producer.register("test") |
| producer.start() |
| |
| producer.send("test", msg1) |
| producer.send("test", msg2) |
| mockProducer.setErrorNext(true, true, new RecordTooLargeException()) |
| producer.send("test", msg3) // Callback exception |
| assertTrue(mockProducer.isClosed) |
| assertNull(producer.producerRef.get()) |
| assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) |
| |
| producer.send("test", msg4) // Should succeed because the producer recovered. |
| assertFalse(mockProducer.isClosed) |
| assertNotNull(producer.producerRef.get()) |
| assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) |
| producer.flush("test") // Should not throw |
| |
| producer.send("test", msg5) // Should be able to send again after flush |
| assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) |
| producer.flush("test") |
| |
| assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent |
| producer.stop() |
| } |
| |
| /** |
| * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]: |
| * |
| * If there's an exception, we should: |
| * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. |
| * 2. Recreate the producer. |
| * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option) |
| * |
| * This test ensures that the failures are handled properly across multiple sources |
| * which share the same producer. |
| */ |
| @Test |
| def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes) |
| val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics, |
| dropProducerExceptions = true) // Here's where we enable exception dropping. |
| producer.register("test1") |
| producer.register("test2") |
| |
| producer.start() |
| |
| // Initial sends |
| producer.send("test1", msg1) |
| producer.send("test2", msg2) |
| |
| // Inject error for next send |
| mockProducer.setErrorNext(true, true, new RecordTooLargeException()) |
| producer.send("test1", msg3) // Callback exception |
| assertTrue(mockProducer.isClosed) |
| assertNull(producer.producerRef.get()) |
| assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) |
| |
| // Subsequent sends |
| producer.send("test1", msg4) // Should succeed because the producer recovered. |
| assertFalse(mockProducer.isClosed) |
| assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) |
| assertNotNull(producer.producerRef.get()) |
| producer.send("test2", msg5) // Second source should also not have any error. |
| assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) |
| |
| // Flushes |
| producer.flush("test2") // Should not throw for test2 |
| producer.send("test2", msg6) // Should still work after flush |
| |
| producer.flush("test1") // Should not throw for test1 either |
| producer.send("test1", msg7) |
| |
| assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent |
| producer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) |
| val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) |
| val producerMetrics = new KafkaSystemProducerMetrics() |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => { |
| mockProducer.open() // A new producer would not already be closed, so reset it. |
| mockProducer |
| }, |
| metrics = producerMetrics, |
| dropProducerExceptions = true) // Here's where we enable exception dropping. |
| producer.register("test1") |
| producer.register("test2") |
| producer.start() |
| |
| producer.send("test1", msg1) |
| producer.send("test2", msg2) |
| mockProducer.setErrorNext(true, false, new SerializationException()) |
| val sendException = intercept[SystemProducerException] { |
| producer.send("test1", msg3) // User-thread exception |
| } |
| assertTrue(sendException.getCause.isInstanceOf[SerializationException]) |
| assertFalse(mockProducer.isClosed) |
| assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated |
| assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) |
| |
| producer.send("test1", msg3) // Should be able to resend msg3 |
| assertFalse(mockProducer.isClosed) |
| assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) |
| assertNotNull(producer.producerRef.get()) |
| producer.send("test2", msg4) // Second source should not be affected |
| |
| producer.flush("test1") // Flush should be unaffected |
| |
| producer.send("test1", msg5) // Should be able to send again after flush |
| |
| assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent |
| producer.stop() |
| } |
| |
| @Test |
| def testKafkaProducerFlushMsgsWhenStop { |
| val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) |
| val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) |
| val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) |
| val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes) |
| |
| val mockProducer = new MockKafkaProducer(1, "test", 1) |
| val systemProducer = new KafkaSystemProducer(systemName = "test", |
| getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics) |
| systemProducer.register("test") |
| systemProducer.register("test2") |
| systemProducer.start() |
| systemProducer.send("test", msg1) |
| |
| mockProducer.setShouldBuffer(true) |
| systemProducer.send("test", msg2) |
| systemProducer.send("test", msg3) |
| systemProducer.send("test2", msg4) |
| assertEquals(1, mockProducer.getMsgsSent) |
| mockProducer.startDelayedSendThread(2000) |
| assertEquals(1, mockProducer.getMsgsSent) |
| systemProducer.stop() |
| assertEquals(4, mockProducer.getMsgsSent) |
| } |
| |
| @Test |
| def testSystemStreamNameNullOrEmpty { |
| val omeStreamNameNull = new OutgoingMessageEnvelope(new SystemStream("test", null), "a".getBytes) |
| val omeStreamNameEmpty = new OutgoingMessageEnvelope(new SystemStream("test", ""), "a".getBytes) |
| val mockProducer = new MockKafkaProducer(1, "testMock", 1) |
| val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, |
| metrics = new KafkaSystemProducerMetrics) |
| |
| val thrownNull = intercept[IllegalArgumentException] { |
| producer.register("test1") |
| producer.start() |
| producer.send("testSrc1", omeStreamNameNull) |
| assertEquals(0, mockProducer.getMsgsSent) |
| } |
| val thrownEmpty = intercept[IllegalArgumentException] { |
| producer.register("test2") |
| producer.start() |
| producer.send("testSrc2", omeStreamNameEmpty) |
| assertEquals(0, mockProducer.getMsgsSent) |
| } |
| assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream) |
| assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream) |
| } |
| } |