| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.test.samzasql; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.calcite.plan.RelOptUtil; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.serializers.JsonSerdeV2Factory; |
| import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; |
| import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; |
| import org.apache.samza.sql.system.TestAvroSystemFactory; |
| import org.apache.samza.sql.testutil.JsonUtil; |
| import org.apache.samza.sql.testutil.MyTestUdf; |
| import org.apache.samza.sql.testutil.SamzaSqlTestConfig; |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.apache.samza.test.harness.AbstractIntegrationTestHarness; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class); |
| private final Map<String, String> configs = new HashMap<>(); |
| |
| @Before |
| public void setUp() { |
| super.setUp(); |
| configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); |
| configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); |
| configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); |
| configs.put("systems.kafka.samza.key.serde", "object"); |
| configs.put("systems.kafka.samza.msg.serde", "samzaSqlRelMsg"); |
| configs.put("systems.kafka.default.stream.replication.factor", "1"); |
| configs.put("job.default.system", "kafka"); |
| |
| configs.put("serializers.registry.object.class", JsonSerdeV2Factory.class.getName()); |
| configs.put("serializers.registry.samzaSqlRelMsg.class", JsonSerdeV2Factory.class.getName()); |
| } |
| |
| @Test |
| public void testEndToEnd() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; |
| List<String> sqlStmts = Arrays.asList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<Integer> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) |
| .sorted() |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); |
| } |
| |
| @Test |
| public void testEndToEndWithProjection() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql1 = "Insert into testavro.outputTopic(id, long_value) " |
| + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1"; |
| List<String> sqlStmts = Arrays.asList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<Integer> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) |
| .sorted() |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); |
| } |
| |
| @Test |
| public void testEndToEndFlatten() throws Exception { |
| int numMessages = 20; |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| |
| LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); |
| String sql1 = |
| "Insert into testavro.outputTopic(string_value, id, bytes_value, fixed_value, float_value) " |
| + " select Flatten(array_values) as string_value, id, bytes_value, fixed_value, float_value " |
| + " from testavro.COMPLEX1"; |
| List<String> sqlStmts = Collections.singletonList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); |
| |
| int expectedMessages = 0; |
| // Flatten de-normalizes the data. So there is separate record for each entry in the array. |
| for (int index = 1; index < numMessages; index++) { |
| expectedMessages = expectedMessages + Math.max(1, index); |
| } |
| Assert.assertEquals(expectedMessages, outMessages.size()); |
| } |
| |
| @Test |
| public void testEndToEndSubQuery() throws Exception { |
| int numMessages = 20; |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql1 = |
| "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; |
| List<String> sqlStmts = Collections.singletonList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); |
| |
| int expectedMessages = 0; |
| // Flatten de-normalizes the data. So there is separate record for each entry in the array. |
| for (int index = 1; index < numMessages; index++) { |
| expectedMessages = expectedMessages + Math.max(1, index); |
| } |
| Assert.assertEquals(expectedMessages, outMessages.size()); |
| } |
| |
| @Test |
| public void testEndToEndUdf() throws Exception { |
| int numMessages = 20; |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql1 = "Insert into testavro.outputTopic(id, long_value) " |
| + "select id, MyTest(id) as long_value from testavro.SIMPLE1"; |
| List<String> sqlStmts = Collections.singletonList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| LOG.info("output Messages " + TestAvroSystemFactory.messages); |
| |
| List<Integer> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString())) |
| .sorted() |
| .collect(Collectors.toList()); |
| Assert.assertEquals(outMessages.size(), numMessages); |
| MyTestUdf udf = new MyTestUdf(); |
| |
| Assert.assertTrue( |
| IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages)); |
| } |
| |
| @Test |
| public void testRegexMatchUdfInWhereClause() throws Exception { |
| int numMessages = 20; |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql1 = |
| "Insert into testavro.outputTopic(id) " |
| + "select id " |
| + "from testavro.SIMPLE1 " |
| + "where RegexMatch('.*4', name)"; |
| List<String> sqlStmts = Collections.singletonList(sql1); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| LOG.info("output Messages " + TestAvroSystemFactory.messages); |
| // There should be two messages that contain "4" |
| Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableInnerJoin() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| staticConfigs.putAll(configs); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," |
| + " p.name as profileName, p.address as profileAddress " |
| + "from testavro.PROFILE.`$table` as p " |
| + "join testavro.PAGEVIEW as pv " |
| + " on p.id = pv.profileId"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString())) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| staticConfigs.putAll(configs); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PROFILE.`$table` as p " |
| + "join testavro.PAGEVIEW as pv " |
| + " on p.id = pv.profileId"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> { |
| GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress"); |
| GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum")); |
| return ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString()) + "," |
| + profileAddr.get("zip") + "," + streetNum.get("number"); |
| }) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameAddressJoin(numMessages); |
| Assert.assertEquals(outMessages, expectedOutMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| staticConfigs.putAll(configs); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PROFILE.`$table` as p " |
| + "join testavro.PAGEVIEW as pv " |
| + " on p.id = pv.profileId " |
| + "where p.name = 'Mike'"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString())) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(4, outMessages.size()); |
| List<String> expectedOutMessages = |
| TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages) |
| .stream() |
| .filter(msg -> msg.endsWith("Mike")) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableInnerJoinWithNullForeignKeys() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PAGEVIEW as pv " |
| + "join testavro.PROFILE.`$table` as p " |
| + " on pv.profileId = p.id"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString())) |
| .collect(Collectors.toList()); |
| // Half the foreign keys are null. |
| Assert.assertEquals(numMessages / 2, outMessages.size()); |
| List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableLeftJoin() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PAGEVIEW as pv " |
| + "left join testavro.PROFILE.`$table` as p " |
| + " on pv.profileId = p.id"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString())) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| List<String> expectedOutMessages = |
| TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableRightJoin() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PROFILE.`$table` as p " |
| + "right join testavro.PAGEVIEW as pv " |
| + " on p.id = pv.profileId"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : |
| ((GenericRecord) x.getMessage()).get("profileName").toString())) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| List<String> expectedOutMessages = |
| TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableTableJoin() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PAGEVIEW as pv " |
| + "join testavro.PROFILE.`$table` as p " |
| + " on p.id = pv.profileId " |
| + " join testavro.COMPANY.`$table` as c " |
| + " on p.companyId = c.id"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + ((GenericRecord) x.getMessage()).get("profileName").toString() + "," |
| + ((GenericRecord) x.getMessage()).get("companyName").toString()) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(numMessages, outMessages.size()); |
| List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| @Test |
| public void testEndToEndStreamTableTableJoinWithCompositeKey() throws Exception { |
| int numMessages = 20; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); |
| String sql = |
| "Insert into testavro.enrichedPageViewTopic " |
| + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName," |
| + " p.address as profileAddress " |
| + "from testavro.PAGEVIEW as pv " |
| + "join testavro.PROFILE.`$table` as p " |
| + " on p.id = pv.profileId " |
| + " join testavro.COMPANY.`$table` as c " |
| + " on p.companyId = c.id AND c.id = pv.profileId"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| List<String> outMessages = TestAvroSystemFactory.messages.stream() |
| .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," |
| + ((GenericRecord) x.getMessage()).get("profileName").toString() + "," |
| + ((GenericRecord) x.getMessage()).get("companyName").toString()) |
| .collect(Collectors.toList()); |
| Assert.assertEquals(TestAvroSystemFactory.companies.length, outMessages.size()); |
| List<String> expectedOutMessages = |
| TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(TestAvroSystemFactory.companies.length); |
| Assert.assertEquals(expectedOutMessages, outMessages); |
| } |
| |
| // Disabling the test until SAMZA-1652 and SAMZA-1661 are fixed. |
| @Ignore |
| @Test |
| public void testEndToEndGroupBy() throws Exception { |
| int numMessages = 200; |
| long windowDurationMs = 200; |
| |
| TestAvroSystemFactory.messages.clear(); |
| Map<String, String> staticConfigs = |
| SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, false, windowDurationMs); |
| staticConfigs.putAll(configs); |
| String sql = |
| "Insert into testavro.pageViewCountTopic" |
| + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`" |
| + " from testavro.PAGEVIEW as pv" |
| + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" |
| + " group by (pv.pageKey)"; |
| |
| List<String> sqlStmts = Arrays.asList(sql); |
| staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); |
| SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); |
| runner.runAndWaitForFinish(); |
| |
| // Let's capture the list of windows/counts per key. |
| HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>(); |
| TestAvroSystemFactory.messages.stream() |
| .map(x -> { |
| String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString(); |
| String count = ((GenericRecord) x.getMessage()).get("count").toString(); |
| pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count); |
| return pageKeyCountListMap; |
| }); |
| |
| HashMap<String, Integer> pageKeyCountMap = new HashMap<>(); |
| pageKeyCountListMap.forEach((key, list) -> { |
| // Check that the number of windows per key is non-zero but less than the number of input messages per key. |
| Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length); |
| // Collapse the count of messages per key |
| pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum()); |
| }); |
| |
| Set<String> pageKeys = new HashSet<>(Arrays.asList("job", "inbox")); |
| HashMap<String, Integer> expectedPageKeyCountMap = |
| TestAvroSystemFactory.getPageKeyGroupByResult(numMessages, pageKeys); |
| |
| Assert.assertEquals(expectedPageKeyCountMap, pageKeyCountMap); |
| } |
| } |