For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.
{% highlight java %}
class BadPageViewFilterApplication implements StreamApplication { @Override public void describe(StreamApplicationDescriptor appDesc) { … } } public class BadPageViewFilter implements StreamApplication { @Override public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor kafka = new KafkaSystemDescriptor("test"); InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class)); OutputDescriptor<DecoratedPageView> outputPageViews = kafka.getOutputDescriptor( “decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class)); MessageStream<PageView> pageViews = appDesc.getInputStream(pageViewInput); pageViews.filter(this::isValidPageView) .map(this::addProfileInformation) .sendTo(appDesc.getOutputStream(outputPageViews)); } }
{% endhighlight %}
There are 4 simple steps to write a test for your stream processing logic and assert on the output
In the example we are writing we use a Kafka system called “test”, so we will configure an equivalent in memory system (name should be the same as used in job) as shown below:
{% highlight java %}
InMemorySystemDescriptor inMemory = new InMemorySystemDescriptor("test");
{% endhighlight %}
Input Stream described by InMemoryInputDescriptor, these streams need to be initialized with messages (data), since your job reads this.
{% highlight java %}
InMemoryInputDescriptor<PageView> pageViewInput = inMemory.getInputDescriptor(“page-views”, new NoOpSerde<>());
{% endhighlight %} {% highlight jproperties %}
INFO: Use the org.apache.samza.operators.KV as the message type ex: InMemoryInputDescriptor<KV<String,PageView>> as the message type to use key of the KV (String here) as key and value as message (PageView here) for the IncomingMessageEnvelope in samza job, using all the other data types will result in key of the the IncomingMessageEnvelope set to null
{% endhighlight %}
Output Stream described by InMemoryOutputDescriptor, these streams need to be initialized with with a partition count and are empty since your job writes to these streams
{% highlight java %}
InMemoryOutputDescriptor<DecoratedPageView> outputPageViews = inMemory.getOutputDescriptor("decorated-page-views", new NoOpSerde<>())
{% endhighlight %}
{% highlight jproperties %}
Note: Input streams are immutable - ie., once they have been created you can't modify their contents eg: by adding new messages"All input streams are supposed to be bounded
{% endhighlight %}
{% highlight java %}
List<PageView> pageViews = generateData(...); TestRunner .of(new BadPageViewFilterApplication()) .addInputStream(pageViewInput, pageViews) .addOutputStream(outputPageViews, 10) .run(Duration.ofMillis(1500));
{% endhighlight %}
{% highlight jproperties %}
Info: Use addConfig(Map<String, String> configs) or addConfig(String key, String value) to add/modify any config in the TestRunner
{% endhighlight %}
You have the following choices for asserting the results of your tests
{% highlight java %}
// Consume multi-paritioned stream, key of the map represents partitionId Map<Integer, PageView> expOutput; StreamAssert.containsInOrder(outputPageViews, expectedOutput, Duration.ofMillis(1000)); // Consume single paritioned stream StreamAssert.containsInOrder(outputPageViews, Arrays.asList(...), Duration.ofMillis(1000));
{% endhighlight %}
{% highlight java %}
Assert.assertEquals( TestRunner.consumeStream(outputPageViews,Duration.ofMillis(1000)).get(0).size(),1 );
{% endhighlight %}
Complete Glance at the code
{% highlight java %}
@Test public void testStreamDSLApi() throws Exception { // Generate Mock Data List<PageView> pageViews = genrateMockInput(...); List<DecoratedPageView> expectedOutput = genrateMockOutput(...); // Configure System and Stream Descriptors InMemorySystemDescriptor inMemory = new InMemorySystemDescriptor("test"); InMemoryInputDescriptor<PageView> pageViewInput = inMemory .getInputDescriptor(“page-views”, new NoOpSerde<>()); InMemoryOutputDescriptor<DecoratedPageView> outputPageView = inMemory .getOutputDescriptor(“decorated-page-views”, new NoOpSerde<>()) // Configure the TestRunner TestRunner .of(new BadPageViewFilterApplication()) .addInputStream(pageViewInput, pageViews) .addOutputStream(outputPageView, 10) .run(Duration.ofMillis(1500)); // Assert the results StreamAssert.containsInOrder(expectedOutput, outputPageView, Duration.ofMillis(1000)); }
{% endhighlight %}
For a Low Level Task API
{% highlight java %}
public class BadPageViewFilter implements TaskApplication { @Override public void describe(TaskApplicationDescriptor appDesc) { // Add input, output streams and tables KafkaSystemDescriptor<String, PageViewEvent> kafkaSystem = new KafkaSystemDescriptor(“kafka”) .withConsumerZkConnect(myZkServers) .withProducerBootstrapServers(myBrokers); KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<PageViewEvent>()); // Add input, output streams and tables appDesc.withInputStream(kafkaSystem.getInputDescriptor(“pageViewEvent”, serde)) .withOutputStream(kafkaSystem.getOutputDescriptor(“goodPageViewEvent”, serde)) .withTable(new RocksDBTableDescriptor( “badPageUrlTable”, KVSerde.of(new StringSerde(), new IntegerSerde()) .withTaskFactory(new BadPageViewTaskFactory()); } } public class BadPageViewTaskFactory implements StreamTaskFactory { @Override public StreamTask createInstance() { // Add input, output streams and tables return new BadPageViewFilterTask(); } } public class BadPageViewFilterTask implements StreamTask { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { // process message synchronously } } @Test public void testBadPageViewFilterTaskApplication() { List<PageView> badPageViews = Arrays.asList(generatePageViews(..)); List<Profile> expectedGoodPageViews = Arrays.asList(generatePageViews(..)); InMemorySystemDescriptor inMemory = new InMemorySystemDescriptor("kafka"); InMemoryInputDescriptor<PageView> pageViewInput = inMemory .getInputDescriptor("pageViewEvent", new NoOpSerde<>()); InMemoryOutputDescriptor<PageView> pageViewOutput = inMemory .getOutputDescriptor("goodPageViewEvent", new NoOpSerde<>()); TestRunner .of(new BadPageViewFilter()) .addInputStream(pageViewInput, badPageViews) .addOutputStream(pageViewOutput, 1) .run(Duration.ofSeconds(2)); StreamAssert.containsInOrder(expectedGoodPageViews, pageViewOutput, Duration.ofMillis(1000)); }
{% endhighlight %}
Follow a similar approach for Legacy Low Level API, just provide the classname (class implementing StreamTask or AsyncStreamTask) to TestRunner
{% highlight java %}
public class MultiplyByTenStreamTask implements StreamTask { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"), envelope.getKey(), envelope.getKey(), obj * 10)); } } @Test public void testLowLevelApi() throws Exception { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); InMemorySystemDescriptor inMemory = new InMemorySystemDescriptor("test"); InMemoryInputDescriptor<Integer> numInput = inMemory .getInputDescriptor("input", new NoOpSerde<Integer>()); InMemoryOutputDescriptor<Integer> numOutput = inMemory .getOutputDescriptor("output", new NoOpSerde<Integer>()); TestRunner .of(MyStreamTestTask.class) .addInputStream(numInput, inputList) .addOutputStream(numOutput, 1) .run(Duration.ofSeconds(1)); Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0), IsIterableContainingInOrder.contains(outputList.toArray()));; }
{% endhighlight %}
There is no additional config/changes required for TestRunner apis for testing samza jobs using StreamApplication or TaskApplication APIs
Legacy task api only supports RocksDbTable and needs following configs to be added to TestRunner. For example if your job is using a RocksDbTable named “my-store” with key and msg serde of String type {% highlight java %}
Map<String, String> config = new HashMap<>(); config.put(“stores.my-store.factory”, “org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory”); config.out(“serializers.registry.string.class”, “org.apache.samza.serializers.StringSerdeFactory”); config.put(“stores.my-store.key.serde”, “string”); config.put(“stores.my-store.msg.serde”, “string”);
TestRunner .of(...) .addConfig(config) ...
{% endhighlight %}