blob: c41038da8e6148321b4fafc99c52966962c662bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.samzasql;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.planner.SamzaSqlValidator;
import org.apache.samza.sql.planner.SamzaSqlValidatorException;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
@Test
public void testSinkEndToEndWithKey() throws SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
}
@Test
@Ignore("Disabled due to flakiness related to data generation; Refer Pull Request #905 for details")
public void testSinkEndToEndWithKeyWithNullRecords() throws SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> props = new HashMap<>();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, false, true);
String sql1 = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
}
@Test (expected = AssertionError.class)
public void testSinkEndToEndWithoutKey() throws SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
}
@Test
public void testJoinEndToEnd() throws SamzaSqlValidatorException {
testJoinEndToEndHelper(false);
}
@Test
public void testJoinEndToEndWithUdf() throws SamzaSqlValidatorException {
testJoinEndToEndWithUdfHelper(false);
}
@Test
public void testJoinEndToEndWithOptimizer() throws SamzaSqlValidatorException {
testJoinEndToEndHelper(true);
}
@Test
public void testJoinEndToEndWithUdfAndOptimizer() throws SamzaSqlValidatorException {
testJoinEndToEndWithUdfHelper(true);
}
void testJoinEndToEndHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
Assert.assertEquals(expectedOutMessages, outMessages);
}
void testJoinEndToEndWithUdfHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = BuildOutputRecord('id', pv.profileId)";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
public void testJoinEndToEndWithFilter() throws SamzaSqlValidatorException {
testJoinEndToEndWithFilterHelper(false);
}
@Test
public void testJoinEndToEndWithUdfAndFilter() throws SamzaSqlValidatorException {
testJoinEndToEndWithUdfAndFilterHelper(false);
}
@Test
public void testJoinEndToEndWithFilterAndOptimizer() throws SamzaSqlValidatorException {
testJoinEndToEndWithFilterHelper(true);
}
@Test
public void testJoinEndToEndWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
testJoinEndToEndWithUdfAndFilterHelper(true);
}
void testJoinEndToEndWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId"
+ " where p.name = 'Mike' and pv.profileId = 1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(1, outMessages.size());
Assert.assertEquals(outMessages.get(0), "home,Mike");
}
void testJoinEndToEndWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+ " where p.name = 'Mike' and pv.profileId = 1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(1, outMessages.size());
Assert.assertEquals(outMessages.get(0), "home,Mike");
}
@Test
public void testSourceEndToEndWithKeyWithNullForeignKeys() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(numMessages / 2, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages);
Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
populateProfileTable(staticConfigs, numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "right join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
((GenericRecord) x.getMessage()).get("profileName").toString()))
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test(expected = SamzaException.class)
public void testJoinConditionWithMoreThanOneConjunction() throws SamzaSqlValidatorException {
int numMessages = 20;
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "right join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId and p.__key__ = pv.pageKey where p.name is null or p.name <> '0'";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
}
@Test(expected = SamzaException.class)
public void testJoinConditionMissing__key__() throws SamzaSqlValidatorException {
int numMessages = 20;
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "right join testavro.PAGEVIEW as pv "
+ " on p.id = pv.profileId where p.name is null or p.name <> '0'";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
}
@Test
public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws SamzaSqlValidatorException {
int numMessages = 21;
TestAvroSystemFactory.messages.clear();
RemoteStoreIOResolverTestFactory.records.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
populateProfileTable(staticConfigs, numMessages);
// The below query reads messages from a stream and deletes the corresponding records from the table.
// Since the stream has alternate messages with null foreign key, only half of the messages will have
// successful joins and hence only half of the records in the table will be deleted. Although join is
// redundant here, keeping it just for testing purpose.
String sql =
"Insert into testRemoteStore.Profile.`$table` "
+ "select p.__key__ as __key__, 'DELETE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId ";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
runApplication(config);
Assert.assertEquals((numMessages + 1) / 2, RemoteStoreIOResolverTestFactory.records.size());
}
@Test
public void testDeleteOpValidation() throws SamzaSqlValidatorException {
int numMessages = 1;
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
String sql =
"Insert into testRemoteStore.Profile.`$table` "
+ "select p.__key__ as __key__, 'DELETE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId ";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
}
@Test (expected = SamzaSqlValidatorException.class)
public void testUnsupportedOpValidation() throws SamzaSqlValidatorException {
int numMessages = 1;
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
String sql =
"Insert into testRemoteStore.Profile.`$table` "
+ "select p.__key__ as __key__, 'UPDATE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
}
@Test (expected = SamzaSqlValidatorException.class)
public void testNonKeyWithDeleteOpValidation() throws SamzaSqlValidatorException {
int numMessages = 1;
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true);
String sql =
"Insert into testRemoteStore.Profile.`$table` "
+ "select p.__key__ as pageKey, 'UPDATE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId ";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
}
private void populateProfileTable(Map<String, String> staticConfigs, int numMessages) {
RemoteStoreIOResolverTestFactory.records.clear();
String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));
Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
}
}