blob: 012b21a4b2e821052a09e1f5de993926f70b89a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressPointFields;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*/
@SuppressPointFields(bugUrl="https://issues.apache.org/jira/browse/SOLR-10960")
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class JDBCStreamTest extends SolrCloudTestCase {
private static final String COLLECTIONORALIAS = "jdbc";
private static final int TIMEOUT = 30;
private static final String id = "id";
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
boolean useAlias = random().nextBoolean();
String collection;
if (useAlias) {
collection = COLLECTIONORALIAS + "_collection";
} else {
collection = COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
}
}
@BeforeClass
public static void setupDatabase() throws Exception {
// Initialize Database
// Ok, so.....hsqldb is doing something totally weird so I thought I'd take a moment to explain it.
// According to http://www.hsqldb.org/doc/1.8/guide/guide.html#N101EF, section "Components of SQL Expressions", clause "name",
// "When an SQL statement is issued, any lowercase characters in unquoted identifiers are converted to uppercase."
// :( Like seriously....
// So, for this reason and to simplify writing these tests I've decided that in all statements all table and column names
// will be in UPPERCASE. This is to ensure things look and behave consistently. Note that this is not a requirement of the
// JDBCStream and is only a carryover from the driver we are testing with.
Class.forName("org.hsqldb.jdbcDriver").newInstance();
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("create table COUNTRIES(CODE varchar(3) not null primary key, COUNTRY_NAME varchar(50), DELETED char(1) default 'N')");
statement.executeUpdate("create table PEOPLE(ID int not null primary key, NAME varchar(50), COUNTRY_CODE char(2), DELETED char(1) default 'N')");
statement.executeUpdate("create table PEOPLE_SPORTS(ID int not null primary key, PERSON_ID int, SPORT_NAME varchar(50), DELETED char(1) default 'N')");
statement.executeUpdate("create table UNSUPPORTED_COLUMNS(ID int not null primary key, UNSP binary)");
statement.executeUpdate("create table DUAL(ID int not null primary key)");
statement.executeUpdate("insert into DUAL values(1)");
}
@AfterClass
public static void teardownDatabase() throws SQLException {
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("shutdown");
}
@Before
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
@Before
public void cleanDatabase() throws Exception {
// Clear database
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("delete from COUNTRIES WHERE 1=1");
statement.executeUpdate("delete from PEOPLE WHERE 1=1");
statement.executeUpdate("delete from PEOPLE_SPORTS WHERE 1=1");
statement.executeUpdate("delete from UNSUPPORTED_COLUMNS WHERE 1=1");
}
}
@Test
public void testJDBCSelect() throws Exception {
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
}
TupleStream stream;
List<Tuple> tuples;
// Simple 1
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE",
new FieldComparator("CODE", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assert(tuples.size() == 4);
assertOrderOf(tuples, "CODE", "NL", "NO", "NP", "US");
assertOrderOf(tuples, "COUNTRY_NAME", "Netherlands", "Norway", "Nepal", "United States");
// Simple 2
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by COUNTRY_NAME",
new FieldComparator("COUNTRY_NAME", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assertEquals(4, tuples.size());
assertOrderOf(tuples, "CODE", "NP", "NL", "NO", "US");
assertOrderOf(tuples, "COUNTRY_NAME", "Nepal", "Netherlands", "Norway", "United States");
// Additional Types
String query = "select 1 as ID1, {ts '2017-02-18 12:34:56.789'} as TS1, {t '01:02:03'} as T1, "
+ "{d '1593-03-14'} as D1, cast(12.34 AS DECIMAL(4,2)) as DEC4_2, "
+ "cast(1234 AS DECIMAL(4,0)) as DEC4_0, cast('big stuff' as CLOB(100)) as CLOB1 "
+ "from DUAL order by ID1";
stream = new JDBCStream("jdbc:hsqldb:mem:.", query, new FieldComparator("ID1", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assertEquals(1, tuples.size());
Tuple t;
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery(query);
rs.next();
t = tuples.iterator().next();
assertString(t, "CLOB1", rs.getString("CLOB1"));
assertString(t, "TS1", rs.getTimestamp("TS1").toInstant().toString());
assertString(t, "T1", rs.getTime("T1").toString());
assertString(t, "D1", rs.getDate("D1").toString());
assertDouble(t, "DEC4_2", rs.getDouble("DEC4_2"));
assertLong(t, "DEC4_0", rs.getLong("DEC4_0"));
}
}
@Test
public void testJDBCJoin() throws Exception {
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','NI')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NG')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NF')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NE')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','NC')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NZ')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','NR')");
}
TupleStream stream;
List<Tuple> tuples;
// Simple 1
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE where COUNTRIES.CODE = 'NL' order by PEOPLE.ID", new FieldComparator("ID", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assertEquals(3, tuples.size());
assertOrderOf(tuples, "ID", 11, 17, 19);
assertOrderOf(tuples, "NAME", "Emma", "Mia", "Olivia");
}
@Test
public void testJDBCSolrMerge() throws Exception {
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
}
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
// Load Solr
new UpdateRequest()
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
.add(id, "1", "code_s", "CA", "name_s", "Canada")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
List<Tuple> tuples;
try {
// Simple 1
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>() {{
put("CODE", "code_s");
put("COUNTRY_NAME", "name_s");
}});
TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream, searchStream});
mergeStream.setStreamContext(streamContext);
tuples = getTuples(mergeStream);
assertEquals(7, tuples.size());
assertOrderOf(tuples, "code_s", "AL", "CA", "GB", "NL", "NO", "NP", "US");
assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
} finally {
solrClientCache.close();
}
}
@Test
public void testJDBCSolrInnerJoinExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("jdbc", JDBCStream.class);
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
// Basic test
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
+ " select("
+ " jdbc(fetchSize=300, connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ " ID as personId,"
+ " NAME as personName,"
+ " COUNTRY_NAME as country"
+ " ),"
+ " on=\"personId\""
+ ")";
stream = factory.constructStream(expression);
String expr = ((Expressible)stream).toExpression(factory).toString();
assertTrue(expr.contains("fetchSize=300"));
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(10, tuples.size());
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
} finally {
solrClientCache.close();
}
}
@Test
public void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("jdbc", JDBCStream.class);
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
// Basic test for no alias
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
+ " select("
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ " ID as personId,"
+ " NAME as personName,"
+ " COUNTRY_NAME as country"
+ " ),"
+ " on=\"personId\""
+ ")";
stream = factory.constructStream(expression);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(10, tuples.size());
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
// Basic test for alias
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
+ " select("
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
+ " PERSONID as personId,"
+ " NAME as personName,"
+ " COUNTRY_NAME as country"
+ " ),"
+ " on=\"personId\""
+ ")";
stream = factory.constructStream(expression);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(10, tuples.size());
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
} finally {
solrClientCache.close();
}
}
@Test
public void testJDBCSolrInnerJoinRollupExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("count", CountMetric.class)
;
// Load Database Data
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
// Basic test
expression =
"rollup("
+ " hashJoin("
+ " hashed=select("
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
+ " select("
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
+ " ID as personId,"
+ " NAME as personName,"
+ " COUNTRY_NAME as country"
+ " ),"
+ " on=\"personId\""
+ " ),"
+ " over=\"country\","
+ " max(rating),"
+ " min(rating),"
+ " avg(rating),"
+ " count(*)"
+ ")";
stream = factory.constructStream(expression);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(2, tuples.size());
Tuple tuple = tuples.get(0);
assertEquals("Netherlands", tuple.getString("country"));
assertEquals(4.3D , tuple.getDouble("max(rating)") , 0.0001);
assertEquals(2.2D , tuple.getDouble("min(rating)"), 0.0001);
assertEquals(3.6D, tuple.getDouble("avg(rating)"), 0.0001);
assertEquals(6D , tuple.getDouble("count(*)"), 0.0001);
tuple = tuples.get(1);
assertEquals("United States", tuple.getString("country"));
assertEquals(5D , tuple.getDouble("max(rating)"), 0.0001);
assertEquals(3D , tuple.getDouble("min(rating)"), 0.0001);
assertEquals(3.95D , tuple.getDouble("avg(rating)"),0.0001);
assertEquals(4D , tuple.getDouble("count(*)"), 0.0001);
} finally {
solrClientCache.close();
}
}
@Test(expected=IOException.class)
public void testUnsupportedColumns() throws Exception {
// No need to load table with any data
TupleStream stream;
// Simple 1
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select ID,UNSP from UNSUPPORTED_COLUMNS",
new FieldComparator("CODE", ComparatorOrder.ASCENDING));
getTuples(stream);
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();
for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
tuples.add(t);
}
tupleStream.close();
return tuples;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... values) throws Exception {
int i = 0;
for(int val : values) {
Tuple t = tuples.get(i);
Long tip = (Long)t.get(fieldName);
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, double... values) throws Exception {
int i = 0;
for(double val : values) {
Tuple t = tuples.get(i);
double tip = (double)t.get(fieldName);
assertEquals("Found value:"+tip+" expecting:"+val, val, tip, 0.00001);
++i;
}
return true;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, String... values) throws Exception {
int i = 0;
for(String val : values) {
Tuple t = tuples.get(i);
if(null == val){
if(null != t.get(fieldName)){
throw new Exception("Found value:"+(String)t.get(fieldName)+" expecting:null");
}
}
else{
String tip = (String)t.get(fieldName);
if(!tip.equals(val)) {
throw new Exception("Found value:"+tip+" expecting:"+val);
}
}
++i;
}
return true;
}
protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(!tuple.getFields().containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
}
}
}
return true;
}
protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(tuple.getFields().containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
}
}
}
return true;
}
public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
long lv = (long)tuple.get(fieldName);
if(lv != l) {
throw new Exception("Longs not equal:"+l+" : "+lv);
}
return true;
}
public boolean assertDouble(Tuple tuple, String fieldName, double d) throws Exception {
double dv = (double)tuple.get(fieldName);
if(dv != d) {
throw new Exception("Doubles not equal:"+d+" : "+dv);
}
return true;
}
public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception {
String actual = (String)tuple.get(fieldName);
if( (null == expected && null != actual) ||
(null != expected && null == actual) ||
(null != expected && !expected.equals(actual))){
throw new Exception("Longs not equal:"+expected+" : "+actual);
}
return true;
}
}