blob: 78fc7b533df1b4118fdab8f41fd26664c08c517b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.table;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.Context;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
import org.apache.samza.test.harness.IntegrationTestHarness;
import org.apache.samza.test.util.ArraySystemFactory;
import org.apache.samza.test.util.Base64Serializer;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
import static org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* This test class tests sendTo() and join() for local tables
*/
public class TestLocalTableEndToEnd extends IntegrationTestHarness {
@Test
public void testSendTo() throws Exception {
int count = 10;
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile.samza.system", "test");
configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
MyMapFunction mapFn = new MyMapFunction();
final StreamApplication app = appDesc -> {
Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(isd)
.map(mapFn)
.sendTo(table);
};
Config config = new MapConfig(configs);
final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
executeRun(runner, config);
runner.waitForFinish();
for (int i = 0; i < partitionCount; i++) {
MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
assertEquals(count, mapFnCopy.received.size());
mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
}
}
static class StreamTableJoinApp implements StreamApplication {
static List<PageView> received = new LinkedList<>();
static List<EnrichedPageView> joined = new LinkedList<>();
@Override
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, Profile>> table = appDesc.getTable(
new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(profileISD)
.map(m -> new KV(m.getMemberId(), m))
.sendTo(table);
GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
received.add(pv);
return pv;
})
.partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
}
}
@Test
public void testStreamTableJoin() throws Exception {
int count = 10;
PageView[] pageViews = TestTableData.generatePageViews(count);
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
configs.put("streams.Profile.samza.system", "test");
configs.put("streams.Profile.samza.bootstrap", "true");
configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
Config config = new MapConfig(configs);
final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
executeRun(runner, config);
runner.waitForFinish();
assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
}
static class DualStreamTableJoinApp implements StreamApplication {
static List<Profile> sentToProfileTable1 = new LinkedList<>();
static List<Profile> sentToProfileTable2 = new LinkedList<>();
static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
@Override
public void describe(StreamApplicationDescriptor appDesc) {
KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
profileStream1
.map(m -> {
sentToProfileTable1.add(m);
return new KV(m.getMemberId(), m);
})
.sendTo(profileTable);
profileStream2
.map(m -> {
sentToProfileTable2.add(m);
return new KV(m.getMemberId(), m);
})
.sendTo(profileTable);
GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
pageViewStream1
.partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
.join(profileTable, joinFn1)
.sink((m, collector, coordinator) -> joinedPageViews1.add(m));
pageViewStream2
.partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
.join(profileTable, joinFn2)
.sink((m, collector, coordinator) -> joinedPageViews2.add(m));
}
}
@Test
public void testDualStreamTableJoin() throws Exception {
int count = 10;
PageView[] pageViews = TestTableData.generatePageViews(count);
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile1.samza.system", "test");
configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile1.samza.bootstrap", "true");
configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
configs.put("streams.Profile2.samza.system", "test");
configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile2.samza.bootstrap", "true");
configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
configs.put("streams.PageView1.samza.system", "test");
configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
configs.put("streams.PageView2.samza.system", "test");
configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
Config config = new MapConfig(configs);
final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
executeRun(runner, config);
runner.waitForFinish();
assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
}
static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
Map<String, String> configs = new HashMap<>();
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
configs.put(JobConfig.JOB_NAME, "test-table-job");
configs.put(JobConfig.PROCESSOR_ID, "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY, SingleContainerGrouperFactory.class.getName());
// For intermediate streams
configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
configs.put("systems.kafka.samza.key.serde", "int");
configs.put("systems.kafka.samza.msg.serde", "json");
configs.put("systems.kafka.default.stream.replication.factor", "1");
configs.put("job.default.system", "kafka");
configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
return configs;
}
private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
private transient List<Profile> received;
private transient ReadWriteTable table;
@Override
public void init(Context context) {
table = context.getTaskContext().getTable("t1");
this.received = new ArrayList<>();
taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
}
@Override
public KV<Integer, Profile> apply(Profile profile) {
received.add(profile);
return new KV(profile.getMemberId(), profile);
}
public static MyMapFunction getMapFunctionByTask(String taskName) {
return taskToMapFunctionMap.get(taskName);
}
}
}