blob: e1386c8c74e12a521800c8b703f979922c7d3721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.table;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.LocalStoreBackedReadWriteTable;
import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.Table;
import org.apache.samza.task.TaskContext;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.apache.samza.test.util.ArraySystemFactory;
import org.apache.samza.test.util.Base64Serializer;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* This test class tests sendTo() and join() for local tables
*/
public class TestLocalTable extends AbstractIntegrationTestHarness {
@Test
public void testSendTo() throws Exception {
int count = 10;
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile.samza.system", "test");
configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
MyMapFunction mapFn = new MyMapFunction();
final StreamApplication app = appDesc -> {
Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(isd)
.map(mapFn)
.sendTo(table);
};
final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs));
runner.run();
runner.waitForFinish();
for (int i = 0; i < partitionCount; i++) {
MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
assertEquals(count, mapFnCopy.received.size());
mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
}
}
static class TestStreamTableJoin {
static List<PageView> received = new LinkedList<>();
static List<EnrichedPageView> joined = new LinkedList<>();
final int count;
final int partitionCount;
final Map<String, String> configs;
TestStreamTableJoin(int count, int partitionCount, Map<String, String> configs) {
this.count = count;
this.partitionCount = partitionCount;
this.configs = configs;
}
void runTest() {
final StreamApplication app = appDesc -> {
Table<KV<Integer, Profile>> table = appDesc.getTable(
new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(profileISD)
.map(m -> new KV(m.getMemberId(), m))
.sendTo(table);
GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
received.add(pv);
return pv;
})
.partitionBy(PageView::getMemberId, v -> v, "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
};
final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs));
runner.run();
runner.waitForFinish();
assertEquals(count * partitionCount, received.size());
assertEquals(count * partitionCount, joined.size());
assertTrue(joined.get(0) instanceof EnrichedPageView);
}
}
@Test
public void testStreamTableJoin() throws Exception {
int count = 10;
PageView[] pageViews = TestTableData.generatePageViews(count);
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
configs.put("streams.Profile.samza.system", "test");
configs.put("streams.Profile.samza.bootstrap", "true");
configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
TestStreamTableJoin joinTest = new TestStreamTableJoin(count, partitionCount, configs);
joinTest.runTest();
}
static class TestDualStreamTableJoin {
static List<Profile> sentToProfileTable1 = new LinkedList<>();
static List<Profile> sentToProfileTable2 = new LinkedList<>();
static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
final int count;
final int partitionCount;
final Map<String, String> configs;
TestDualStreamTableJoin(int count, int partitionCount, Map<String, String> configs) {
this.count = count;
this.partitionCount = partitionCount;
this.configs = configs;
}
void runTest() {
KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
final StreamApplication app = appDesc -> {
Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
profileStream1
.map(m -> {
sentToProfileTable1.add(m);
return new KV(m.getMemberId(), m);
})
.sendTo(profileTable);
profileStream2
.map(m -> {
sentToProfileTable2.add(m);
return new KV(m.getMemberId(), m);
})
.sendTo(profileTable);
GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
pageViewStream1
.partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
.join(profileTable, joinFn1)
.sink((m, collector, coordinator) -> joinedPageViews1.add(m));
pageViewStream2
.partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
.join(profileTable, joinFn2)
.sink((m, collector, coordinator) -> joinedPageViews2.add(m));
};
final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs));
runner.run();
runner.waitForFinish();
assertEquals(count * partitionCount, sentToProfileTable1.size());
assertEquals(count * partitionCount, sentToProfileTable2.size());
assertEquals(count * partitionCount, joinedPageViews1.size());
assertEquals(count * partitionCount, joinedPageViews2.size());
assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
}
}
@Test
public void testDualStreamTableJoin() throws Exception {
int count = 10;
PageView[] pageViews = TestTableData.generatePageViews(count);
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile1.samza.system", "test");
configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile1.samza.bootstrap", "true");
configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
configs.put("streams.Profile2.samza.system", "test");
configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
configs.put("streams.Profile2.samza.bootstrap", "true");
configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
configs.put("streams.PageView1.samza.system", "test");
configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
configs.put("streams.PageView2.samza.system", "test");
configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
TestDualStreamTableJoin dualJoinTest = new TestDualStreamTableJoin(count, partitionCount, configs);
dualJoinTest.runTest();
}
static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
Map<String, String> configs = new HashMap<>();
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
configs.put(JobConfig.JOB_NAME(), "test-table-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
// For intermediate streams
configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
configs.put("systems.kafka.samza.key.serde", "int");
configs.put("systems.kafka.samza.msg.serde", "json");
configs.put("systems.kafka.default.stream.replication.factor", "1");
configs.put("job.default.system", "kafka");
configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
return configs;
}
private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
private transient List<Profile> received;
private transient ReadableTable table;
@Override
public void init(Config config, TaskContext context) {
table = (ReadableTable) context.getTable("t1");
this.received = new ArrayList<>();
taskToMapFunctionMap.put(context.getTaskName().getTaskName(), this);
}
@Override
public KV<Integer, Profile> apply(Profile profile) {
received.add(profile);
return new KV(profile.getMemberId(), profile);
}
public static MyMapFunction getMapFunctionByTask(String taskName) {
return taskToMapFunctionMap.get(taskName);
}
}
@Test
public void testAsyncOperation() throws Exception {
KeyValueStore kvStore = mock(KeyValueStore.class);
LocalStoreBackedReadWriteTable<String, String> table = new LocalStoreBackedReadWriteTable<>("table1", kvStore);
TaskContext taskContext = mock(TaskContext.class);
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
table.init(containerContext, taskContext);
// GET
doReturn("bar").when(kvStore).get(anyString());
Assert.assertEquals("bar", table.getAsync("foo").get());
// GET-ALL
Map<String, String> recordMap = new HashMap<>();
recordMap.put("foo1", "bar1");
recordMap.put("foo2", "bar2");
doReturn(recordMap).when(kvStore).getAll(anyList());
Assert.assertEquals(recordMap, table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
// PUT
table.putAsync("foo1", "bar1").get();
verify(kvStore, times(1)).put(anyString(), anyString());
// PUT-ALL
List<Entry<String, String>> records = Arrays.asList(new Entry<>("foo1", "bar1"), new Entry<>("foo2", "bar2"));
table.putAllAsync(records).get();
verify(kvStore, times(1)).putAll(anyList());
// DELETE
table.deleteAsync("foo").get();
verify(kvStore, times(1)).delete(anyString());
// DELETE-ALL
table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
verify(kvStore, times(1)).deleteAll(anyList());
}
}