blob: 6afc77c5b1bd185bea047224978c1b4f0cc53c97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.framework;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
import org.apache.samza.test.controlmessages.TestData;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.PageViewToProfileJoinFunction;
import org.apache.samza.test.table.TestTableData;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.samza.test.controlmessages.TestData.PageView;
public class StreamApplicationIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(StreamApplicationIntegrationTest.class);
private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
@Test
public void testStatefulJoinWithLocalTable() {
Random random = new Random();
List<KV<String, TestTableData.PageView>> pageViews = Arrays.asList(TestTableData.generatePageViews(10))
.stream()
.map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
.collect(Collectors.toList());
List<KV<String, TestTableData.Profile>> profiles = Arrays.asList(TestTableData.generateProfiles(10))
.stream()
.map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
.collect(Collectors.toList());
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
InMemoryInputDescriptor<KV<String, TestTableData.PageView>> pageViewStreamDesc = isd
.getInputDescriptor("PageView", new NoOpSerde<KV<String, TestTableData.PageView>>());
InMemoryInputDescriptor<KV<String, TestTableData.Profile>> profileStreamDesc = isd
.getInputDescriptor("Profile", new NoOpSerde<KV<String, TestTableData.Profile>>())
.shouldBootstrap();
InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
InMemoryOutputDescriptor<String> joinKeysDescriptor = isd
.getOutputDescriptor("JoinPageKeys", new NoOpSerde<>());
TestRunner
.of(new PageViewProfileViewJoinApplication())
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
.addOutputStream(joinKeysDescriptor, 1)
.run(Duration.ofSeconds(2));
Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
Assert.assertEquals(10, TestRunner.consumeStream(joinKeysDescriptor, Duration.ofSeconds(1)).get(0).size());
}
@Test
public void testHighLevelApi() throws Exception {
Random random = new Random();
int count = 10;
List<PageView> pageViews = new ArrayList<>();
for (int memberId = 0; memberId < count; memberId++) {
String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
PageView pv = new PageView(pagekey, memberId);
pageViews.add(pv);
}
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
InMemoryInputDescriptor<PageView> imid = isd.getInputDescriptor("PageView", new NoOpSerde<PageView>());
InMemoryOutputDescriptor<PageView> imod = isd.getOutputDescriptor("Output", new NoOpSerde<PageView>());
TestRunner
.of(new PageViewRepartitionApplication())
.addInputStream(imid, pageViews)
.addOutputStream(imod, 10)
.run(Duration.ofMillis(1500));
Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
}
/**
* Null page key is passed in input data which should fail filter logic
*/
@Test(expected = SamzaException.class)
public void testSamzaJobFailureForStreamApplication() {
int count = 10;
List<TestData.PageView> pageviews = new ArrayList<>();
for (int memberId = 0; memberId < count; memberId++) {
pageviews.add(new TestData.PageView(null, memberId));
}
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
InMemoryInputDescriptor<PageView> imid = isd.getInputDescriptor("PageView", new NoOpSerde<PageView>());
InMemoryOutputDescriptor<PageView> imod = isd.getOutputDescriptor("Output", new NoOpSerde<PageView>());
TestRunner.of(new PageViewFilterApplication())
.addInputStream(imid, pageviews)
.addOutputStream(imod, 10)
.run(Duration.ofMillis(1000));
}
private static class PageViewProfileViewJoinApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
appDescriptor.getInputStream(profileISD)
.map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
.sendTo(table)
.sink((kv, collector, coordinator) -> {
LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
});
OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
appDescriptor.getInputStream(pageViewISD)
.partitionBy(pv -> pv.getValue().getMemberId(), pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(outputStream)
.map(TestTableData.EnrichedPageView::getPageKey)
.sink((joinPageKey, collector, coordinator) -> {
collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
});
}
}
private static class PageViewFilterApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
}
}
private static class PageViewRepartitionApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream
.map(KV::getValue)
.partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
.sink((m, collector, coordinator) ->
collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), m.getKey(), m.getKey(), m)));
}
}
}