| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.client.solrj.io.graph; |
| |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.LuceneTestCase.Slow; |
| import org.apache.solr.client.solrj.SolrRequest; |
| import org.apache.solr.client.solrj.embedded.JettySolrRunner; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.impl.InputStreamResponseParser; |
| import org.apache.solr.client.solrj.io.SolrClientCache; |
| import org.apache.solr.client.solrj.io.Tuple; |
| import org.apache.solr.client.solrj.io.comp.ComparatorOrder; |
| import org.apache.solr.client.solrj.io.comp.FieldComparator; |
| import org.apache.solr.client.solrj.io.stream.*; |
| import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; |
| import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; |
| import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; |
| import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; |
| import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; |
| import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.cloud.AbstractDistribZkTestBase; |
| import org.apache.solr.cloud.SolrCloudTestCase; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.util.BaseTestHarness; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so |
| * SolrStream will get fully exercised through these tests. |
| * |
| **/ |
| |
| @Slow |
| @LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"}) |
| public class GraphExpressionTest extends SolrCloudTestCase { |
| |
| private static final String COLLECTION = "collection1"; |
| |
| private static final String id = "id"; |
| |
| private static final int TIMEOUT = 30; |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| configureCluster(2) |
| .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf")) |
| .configure(); |
| |
| CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient()); |
| AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), |
| false, true, TIMEOUT); |
| } |
| |
| @Before |
| public void cleanIndex() throws Exception { |
| new UpdateRequest() |
| .deleteByQuery("*:*") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| } |
| |
| @Test |
| // commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018 |
| @SuppressWarnings({"unchecked"}) |
| public void testShortestPathStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows") |
| .add(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows") |
| .add(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows") |
| .add(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows") |
| .add(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows") |
| .add(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows") |
| .add(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows") |
| .add(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows") |
| .add(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows") |
| .add(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows") |
| .add(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows") |
| .add(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows") |
| .add(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows") |
| .add(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows") |
| .add(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| List<Tuple> tuples = null; |
| Set<String> paths = null; |
| ShortestPathStream stream = null; |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| context.setSolrClientCache(cache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("shortestPath", ShortestPathStream.class); |
| |
| @SuppressWarnings({"rawtypes"}) |
| Map params = new HashMap(); |
| params.put("fq", "predicate_s:knows"); |
| |
| stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " + |
| "from=\"jim\", " + |
| "to=\"steve\"," + |
| "edge=\"from_s=to_s\"," + |
| "fq=\"predicate_s:knows\","+ |
| "threads=\"3\","+ |
| "partitionSize=\"3\","+ |
| "maxDepth=\"6\")"); |
| |
| stream.setStreamContext(context); |
| paths = new HashSet<>(); |
| tuples = getTuples(stream); |
| |
| assertTrue(tuples.size() == 2); |
| |
| for(Tuple tuple : tuples) { |
| paths.add(tuple.getStrings("path").toString()); |
| } |
| |
| assertTrue(paths.contains("[jim, dave, alex, steve]")); |
| assertTrue(paths.contains("[jim, stan, mary, steve]")); |
| |
| //Test with batch size of 1 |
| |
| params.put("fq", "predicate_s:knows"); |
| |
| stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " + |
| "from=\"jim\", " + |
| "to=\"steve\"," + |
| "edge=\"from_s=to_s\"," + |
| "fq=\"predicate_s:knows\","+ |
| "threads=\"3\","+ |
| "partitionSize=\"1\","+ |
| "maxDepth=\"6\")"); |
| |
| stream.setStreamContext(context); |
| paths = new HashSet<>(); |
| tuples = getTuples(stream); |
| |
| assertTrue(tuples.size() == 2); |
| |
| for(Tuple tuple : tuples) { |
| paths.add(tuple.getStrings("path").toString()); |
| } |
| |
| assertTrue(paths.contains("[jim, dave, alex, steve]")); |
| assertTrue(paths.contains("[jim, stan, mary, steve]")); |
| |
| //Test with bad predicate |
| |
| |
| stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " + |
| "from=\"jim\", " + |
| "to=\"steve\"," + |
| "edge=\"from_s=to_s\"," + |
| "fq=\"predicate_s:crap\","+ |
| "threads=\"3\","+ |
| "partitionSize=\"3\","+ |
| "maxDepth=\"6\")"); |
| |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| assertTrue(tuples.size() == 0); |
| |
| //Test with depth 2 |
| |
| stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " + |
| "from=\"jim\", " + |
| "to=\"steve\"," + |
| "edge=\"from_s=to_s\"," + |
| "fq=\"predicate_s:knows\","+ |
| "threads=\"3\","+ |
| "partitionSize=\"3\","+ |
| "maxDepth=\"2\")"); |
| |
| |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| assertTrue(tuples.size() == 0); |
| |
| //Take out alex |
| params.put("fq", "predicate_s:knows NOT to_s:alex"); |
| |
| stream = (ShortestPathStream)factory.constructStream("shortestPath(collection1, " + |
| "from=\"jim\", " + |
| "to=\"steve\"," + |
| "edge=\"from_s=to_s\"," + |
| "fq=\" predicate_s:knows NOT to_s:alex\","+ |
| "threads=\"3\","+ |
| "partitionSize=\"3\","+ |
| "maxDepth=\"6\")"); |
| |
| |
| stream.setStreamContext(context); |
| paths = new HashSet<>(); |
| tuples = getTuples(stream); |
| assertTrue(tuples.size() == 1); |
| |
| for(Tuple tuple : tuples) { |
| paths.add(tuple.getStrings("path").toString()); |
| } |
| |
| assertTrue(paths.contains("[jim, stan, mary, steve]")); |
| |
| cache.close(); |
| |
| } |
| |
| @Test |
| public void testGatherNodesStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:23:50Z") |
| .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30", "time_ten_seconds_s", "2020-09-24T18:23:40Z") |
| .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1", "time_ten_seconds_s", "2020-09-24T18:23:30Z") |
| .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2", "time_ten_seconds_s", "2020-09-24T18:23:20Z") |
| .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5", "time_ten_seconds_s", "2020-09-24T18:23:10Z") |
| .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:23:00Z") |
| .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20", "time_ten_seconds_s", "2020-09-24T18:22:50Z") |
| .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:40Z") |
| .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:30Z") |
| .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40", "time_ten_seconds_s", "2020-09-24T18:22:20Z") |
| .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:10Z") |
| .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10", "time_ten_seconds_s", "2020-09-24T18:22:00Z") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| List<Tuple> tuples = null; |
| Set<String> paths = null; |
| GatherNodesStream stream = null; |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| context.setSolrClientCache(cache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("gatherNodes", GatherNodesStream.class) |
| .withFunctionName("nodes", GatherNodesStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("random", RandomStream.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("sort", SortStream.class) |
| .withFunctionName("max", MaxMetric.class); |
| |
| String expr = "nodes(collection1, " + |
| "walk=\"product1->product_s\"," + |
| "gather=\"basket_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 4); |
| |
| assertTrue(tuples.get(0).getString("node").equals("basket1")); |
| assertTrue(tuples.get(1).getString("node").equals("basket2")); |
| assertTrue(tuples.get(2).getString("node").equals("basket3")); |
| assertTrue(tuples.get(3).getString("node").equals("basket4")); |
| |
| |
| //Test maxDocFreq param |
| String docFreqExpr = "gatherNodes(collection1, " + |
| "walk=\"product1, product7->product_s\"," + |
| "maxDocFreq=\"2\","+ |
| "gather=\"basket_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(docFreqExpr); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 1); |
| assertTrue(tuples.get(0).getString("node").equals("basket2")); |
| |
| |
| |
| String expr2 = "gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->basket_s\"," + |
| "gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr2); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| |
| stream.setStreamContext(context); |
| |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| |
| |
| assertTrue(tuples.size() == 5); |
| |
| |
| assertTrue(tuples.get(0).getString("node").equals("product3")); |
| assertTrue(tuples.get(0).getDouble("count(*)").equals(3.0D)); |
| |
| assertTrue(tuples.get(1).getString("node").equals("product4")); |
| assertTrue(tuples.get(1).getDouble("count(*)").equals(2.0D)); |
| assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(30.0D)); |
| assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(60.0D)); |
| assertTrue(tuples.get(1).getDouble("min(price_f)").equals(20.0D)); |
| assertTrue(tuples.get(1).getDouble("max(price_f)").equals(40.0D)); |
| |
| assertTrue(tuples.get(2).getString("node").equals("product5")); |
| assertTrue(tuples.get(2).getDouble("count(*)").equals(1.0D)); |
| assertTrue(tuples.get(3).getString("node").equals("product6")); |
| assertTrue(tuples.get(3).getDouble("count(*)").equals(1.0D)); |
| assertTrue(tuples.get(4).getString("node").equals("product7")); |
| assertTrue(tuples.get(4).getDouble("count(*)").equals(1.0D)); |
| |
| //Test list of root nodes |
| expr = "gatherNodes(collection1, " + |
| "walk=\"product4, product7->product_s\"," + |
| "gather=\"basket_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 3); |
| assertTrue(tuples.get(0).getString("node").equals("basket2")); |
| assertTrue(tuples.get(1).getString("node").equals("basket3")); |
| assertTrue(tuples.get(2).getString("node").equals("basket4")); |
| |
| //Test with negative filter query |
| |
| expr = "gatherNodes(collection1, " + |
| "walk=\"product4, product7->product_s\"," + |
| "gather=\"basket_s\", fq=\"-basket_s:basket4\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 2); |
| assertTrue(tuples.get(0).getString("node").equals("basket2")); |
| assertTrue(tuples.get(1).getString("node").equals("basket3")); |
| |
| |
| //Test the window without lag |
| |
| expr = "nodes(collection1, random(collection1, q=\"id:(1 2)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"3\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 5); |
| assertTrue(tuples.get(0).getString("node").equals("1")); |
| assertTrue(tuples.get(1).getString("node").equals("2")); |
| assertTrue(tuples.get(2).getString("node").equals("3")); |
| assertTrue(tuples.get(3).getString("node").equals("4")); |
| assertTrue(tuples.get(4).getString("node").equals("5")); |
| |
| |
| //Test window with lag |
| |
| expr = "nodes(collection1, random(collection1, q=\"id:(1)\", fl=\"time_ten_seconds_s\"), walk=\"time_ten_seconds_s->time_ten_seconds_s\", gather=\"id\", window=\"2\", lag=\"2\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 2); |
| assertTrue(tuples.get(0).getString("node").equals("3")); |
| assertTrue(tuples.get(1).getString("node").equals("4")); |
| cache.close(); |
| } |
| |
| |
| @Test |
| public void testScoreNodesStream() throws Exception { |
| |
| |
| new UpdateRequest() |
| .add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "1") |
| .add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "1") |
| .add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "100") |
| .add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "1") |
| .add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "1") |
| .add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "1") |
| .add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "1") |
| .add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "1") |
| .add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "1") |
| .add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "1") |
| .add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "1") |
| .add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "1") |
| .add(id, "12", "basket_s", "basket5", "product_s", "product1", "price_f", "1") |
| .add(id, "13", "basket_s", "basket6", "product_s", "product1", "price_f", "1") |
| .add(id, "14", "basket_s", "basket7", "product_s", "product1", "price_f", "1") |
| .add(id, "15", "basket_s", "basket4", "product_s", "product1", "price_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| List<Tuple> tuples = null; |
| TupleStream stream = null; |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| context.setSolrClientCache(cache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withDefaultZkHost(cluster.getZkServer().getZkAddress()) |
| .withFunctionName("gatherNodes", GatherNodesStream.class) |
| .withFunctionName("scoreNodes", ScoreNodesStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("sort", SortStream.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class); |
| |
| String expr = "gatherNodes(collection1, " + |
| "walk=\"product3->product_s\"," + |
| "gather=\"basket_s\")"; |
| |
| |
| String expr2 = "sort(by=\"nodeScore desc\", " + |
| "scoreNodes(gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->basket_s\"," + |
| "gather=\"product_s\", " + |
| "count(*), " + |
| "avg(price_f), " + |
| "sum(price_f), " + |
| "min(price_f), " + |
| "max(price_f))))"; |
| |
| stream = factory.constructStream(expr2); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Tuple tuple0 = tuples.get(0); |
| assert(tuple0.getString("node").equals("product4")); |
| assert(tuple0.getLong("docFreq") == 2); |
| assert(tuple0.getLong("count(*)") == 2); |
| |
| Tuple tuple1 = tuples.get(1); |
| assert(tuple1.getString("node").equals("product1")); |
| assert(tuple1.getLong("docFreq") == 8); |
| assert(tuple1.getLong("count(*)") == 3); |
| |
| Tuple tuple2 = tuples.get(2); |
| assert(tuple2.getString("node").equals("product5")); |
| assert(tuple2.getLong("docFreq") == 1); |
| assert(tuple2.getLong("count(*)") == 1); |
| |
| |
| //Test using a different termFreq field then the default count(*) |
| expr2 = "sort(by=\"nodeScore desc\", " + |
| "scoreNodes(termFreq=\"avg(price_f)\",gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->basket_s\"," + |
| "gather=\"product_s\", " + |
| "count(*), " + |
| "avg(price_f), " + |
| "sum(price_f), " + |
| "min(price_f), " + |
| "max(price_f))))"; |
| |
| stream = factory.constructStream(expr2); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| tuple0 = tuples.get(0); |
| assert(tuple0.getString("node").equals("product5")); |
| assert(tuple0.getLong("docFreq") == 1); |
| assert(tuple0.getDouble("avg(price_f)") == 100); |
| |
| tuple1 = tuples.get(1); |
| assert(tuple1.getString("node").equals("product4")); |
| assert(tuple1.getLong("docFreq") == 2); |
| assert(tuple1.getDouble("avg(price_f)") == 1); |
| |
| tuple2 = tuples.get(2); |
| assert(tuple2.getString("node").equals("product1")); |
| assert(tuple2.getLong("docFreq") == 8); |
| assert(tuple2.getDouble("avg(price_f)") == 1); |
| |
| cache.close(); |
| } |
| |
| |
| @Test |
| public void testScoreNodesFacetStream() throws Exception { |
| |
| |
| new UpdateRequest() |
| .add(id, "0", "basket_s", "basket1", "product_ss", "product1", "product_ss", "product3", "product_ss", "product5", "price_f", "1") |
| .add(id, "3", "basket_s", "basket2", "product_ss", "product1", "product_ss", "product6", "product_ss", "product7", "price_f", "1") |
| .add(id, "6", "basket_s", "basket3", "product_ss", "product4", "product_ss","product3", "product_ss","product1", "price_f", "1") |
| .add(id, "9", "basket_s", "basket4", "product_ss", "product4", "product_ss", "product3", "product_ss", "product1","price_f", "1") |
| //.add(id, "12", "basket_s", "basket5", "product_ss", "product1", "price_f", "1") |
| //.add(id, "13", "basket_s", "basket6", "product_ss", "product1", "price_f", "1") |
| //.add(id, "14", "basket_s", "basket7", "product_ss", "product1", "price_f", "1") |
| //.add(id, "15", "basket_s", "basket4", "product_ss", "product1", "price_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| List<Tuple> tuples = null; |
| TupleStream stream = null; |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| context.setSolrClientCache(cache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withDefaultZkHost(cluster.getZkServer().getZkAddress()) |
| .withFunctionName("gatherNodes", GatherNodesStream.class) |
| .withFunctionName("scoreNodes", ScoreNodesStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("facet", FacetStream.class) |
| .withFunctionName("sort", SortStream.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class); |
| |
| String expr = "sort(by=\"nodeScore desc\",scoreNodes(facet(collection1, q=\"product_ss:product3\", buckets=\"product_ss\", bucketSorts=\"count(*) desc\", bucketSizeLimit=100, count(*))))"; |
| |
| stream = factory.constructStream(expr); |
| |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| Tuple tuple = tuples.get(0); |
| assert(tuple.getString("node").equals("product3")); |
| assert(tuple.getLong("docFreq") == 3); |
| assert(tuple.getLong("count(*)") == 3); |
| |
| Tuple tuple0 = tuples.get(1); |
| assert(tuple0.getString("node").equals("product4")); |
| assert(tuple0.getLong("docFreq") == 2); |
| assert(tuple0.getLong("count(*)") == 2); |
| |
| Tuple tuple1 = tuples.get(2); |
| assert(tuple1.getString("node").equals("product1")); |
| assert(tuple1.getLong("docFreq") == 4); |
| assert(tuple1.getLong("count(*)") == 3); |
| |
| Tuple tuple2 = tuples.get(3); |
| assert(tuple2.getString("node").equals("product5")); |
| assert(tuple2.getLong("docFreq") == 1); |
| assert(tuple2.getLong("count(*)") == 1); |
| |
| |
| cache.close(); |
| } |
| |
| |
| |
| |
| |
| @Test |
| public void testGatherNodesFriendsStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim") |
| .add(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam") |
| .add(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max") |
| .add(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip") |
| .add(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve") |
| .add(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| List<Tuple> tuples = null; |
| GatherNodesStream stream = null; |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| context.setSolrClientCache(cache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("gatherNodes", GatherNodesStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("hashJoin", HashJoinStream.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class); |
| |
| String expr = "gatherNodes(collection1, " + |
| "walk=\"bill->from_s\"," + |
| "gather=\"to_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 3); |
| assertTrue(tuples.get(0).getString("node").equals("jim")); |
| assertTrue(tuples.get(1).getString("node").equals("max")); |
| assertTrue(tuples.get(2).getString("node").equals("sam")); |
| |
| //Test scatter branches, leaves and trackTraversal |
| |
| expr = "gatherNodes(collection1, " + |
| "walk=\"bill->from_s\"," + |
| "gather=\"to_s\","+ |
| "scatter=\"branches, leaves\", trackTraversal=\"true\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 4); |
| assertTrue(tuples.get(0).getString("node").equals("bill")); |
| assertTrue(tuples.get(0).getLong("level").equals(0L)); |
| assertTrue(tuples.get(0).getStrings("ancestors").size() == 0); |
| assertTrue(tuples.get(1).getString("node").equals("jim")); |
| assertTrue(tuples.get(1).getLong("level").equals(1L)); |
| List<String> ancestors = tuples.get(1).getStrings("ancestors"); |
| System.out.println("##################### Ancestors:"+ancestors); |
| assert(ancestors.size() == 1); |
| assert(ancestors.get(0).equals("bill")); |
| |
| assertTrue(tuples.get(2).getString("node").equals("max")); |
| assertTrue(tuples.get(2).getLong("level").equals(1L)); |
| ancestors = tuples.get(2).getStrings("ancestors"); |
| assert(ancestors.size() == 1); |
| assert(ancestors.get(0).equals("bill")); |
| |
| assertTrue(tuples.get(3).getString("node").equals("sam")); |
| assertTrue(tuples.get(3).getLong("level").equals(1L)); |
| ancestors = tuples.get(3).getStrings("ancestors"); |
| assert(ancestors.size() == 1); |
| assert(ancestors.get(0).equals("bill")); |
| |
| // Test query root |
| |
| expr = "gatherNodes(collection1, " + |
| "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ |
| "walk=\"from_s->from_s\"," + |
| "gather=\"to_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 3); |
| assertTrue(tuples.get(0).getString("node").equals("jim")); |
| assertTrue(tuples.get(1).getString("node").equals("max")); |
| assertTrue(tuples.get(2).getString("node").equals("sam")); |
| |
| |
| // Test query root scatter branches |
| |
| expr = "gatherNodes(collection1, " + |
| "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ |
| "walk=\"from_s->from_s\"," + |
| "gather=\"to_s\", scatter=\"branches, leaves\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| assertTrue(tuples.size() == 4); |
| assertTrue(tuples.get(0).getString("node").equals("bill")); |
| assertTrue(tuples.get(0).getLong("level").equals(0L)); |
| assertTrue(tuples.get(1).getString("node").equals("jim")); |
| assertTrue(tuples.get(1).getLong("level").equals(1L)); |
| assertTrue(tuples.get(2).getString("node").equals("max")); |
| assertTrue(tuples.get(2).getLong("level").equals(1L)); |
| assertTrue(tuples.get(3).getString("node").equals("sam")); |
| assertTrue(tuples.get(3).getLong("level").equals(1L)); |
| |
| expr = "gatherNodes(collection1, " + |
| "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ |
| "walk=\"from_s->from_s\"," + |
| "gather=\"to_s\")"; |
| |
| String expr2 = "gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->from_s\"," + |
| "gather=\"to_s\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr2); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| |
| assertTrue(tuples.size() == 3); |
| assertTrue(tuples.get(0).getString("node").equals("ann")); |
| assertTrue(tuples.get(1).getString("node").equals("kip")); |
| assertTrue(tuples.get(2).getString("node").equals("steve")); |
| |
| |
| //Test two traversals in the same expression |
| String expr3 = "hashJoin("+expr2+", hashed="+expr2+", on=\"node\")"; |
| |
| HashJoinStream hstream = (HashJoinStream)factory.constructStream(expr3); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| hstream.setStreamContext(context); |
| |
| tuples = getTuples(hstream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| |
| assertTrue(tuples.size() == 3); |
| assertTrue(tuples.get(0).getString("node").equals("ann")); |
| assertTrue(tuples.get(1).getString("node").equals("kip")); |
| assertTrue(tuples.get(2).getString("node").equals("steve")); |
| |
| //================================= |
| |
| |
| expr = "gatherNodes(collection1, " + |
| "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ |
| "walk=\"from_s->from_s\"," + |
| "gather=\"to_s\")"; |
| |
| expr2 = "gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->from_s\"," + |
| "gather=\"to_s\", scatter=\"branches, leaves\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr2); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| |
| |
| assertTrue(tuples.size() == 7); |
| assertTrue(tuples.get(0).getString("node").equals("ann")); |
| assertTrue(tuples.get(0).getLong("level").equals(2L)); |
| assertTrue(tuples.get(1).getString("node").equals("bill")); |
| assertTrue(tuples.get(1).getLong("level").equals(0L)); |
| assertTrue(tuples.get(2).getString("node").equals("jim")); |
| assertTrue(tuples.get(2).getLong("level").equals(1L)); |
| assertTrue(tuples.get(3).getString("node").equals("kip")); |
| assertTrue(tuples.get(3).getLong("level").equals(2L)); |
| assertTrue(tuples.get(4).getString("node").equals("max")); |
| assertTrue(tuples.get(4).getLong("level").equals(1L)); |
| assertTrue(tuples.get(5).getString("node").equals("sam")); |
| assertTrue(tuples.get(5).getLong("level").equals(1L)); |
| assertTrue(tuples.get(6).getString("node").equals("steve")); |
| assertTrue(tuples.get(6).getLong("level").equals(2L)); |
| |
| //Add a cycle from jim to bill |
| new UpdateRequest() |
| .add(id, "6", "from_s", "jim", "to_s", "bill", "message_t", "Hello steve") |
| .add(id, "7", "from_s", "sam", "to_s", "bill", "message_t", "Hello steve") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| expr = "gatherNodes(collection1, " + |
| "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ |
| "walk=\"from_s->from_s\"," + |
| "gather=\"to_s\", trackTraversal=\"true\")"; |
| |
| expr2 = "gatherNodes(collection1, " + |
| expr+","+ |
| "walk=\"node->from_s\"," + |
| "gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")"; |
| |
| stream = (GatherNodesStream)factory.constructStream(expr2); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING)); |
| |
| assertTrue(tuples.size() == 7); |
| assertTrue(tuples.get(0).getString("node").equals("ann")); |
| assertTrue(tuples.get(0).getLong("level").equals(2L)); |
| //Bill should now have one ancestor |
| assertTrue(tuples.get(1).getString("node").equals("bill")); |
| assertTrue(tuples.get(1).getLong("level").equals(0L)); |
| assertTrue(tuples.get(1).getStrings("ancestors").size() == 2); |
| List<String> anc = tuples.get(1).getStrings("ancestors"); |
| |
| Collections.sort(anc); |
| assertTrue(anc.get(0).equals("jim")); |
| assertTrue(anc.get(1).equals("sam")); |
| |
| assertTrue(tuples.get(2).getString("node").equals("jim")); |
| assertTrue(tuples.get(2).getLong("level").equals(1L)); |
| assertTrue(tuples.get(3).getString("node").equals("kip")); |
| assertTrue(tuples.get(3).getLong("level").equals(2L)); |
| assertTrue(tuples.get(4).getString("node").equals("max")); |
| assertTrue(tuples.get(4).getLong("level").equals(1L)); |
| assertTrue(tuples.get(5).getString("node").equals("sam")); |
| assertTrue(tuples.get(5).getLong("level").equals(1L)); |
| assertTrue(tuples.get(6).getString("node").equals("steve")); |
| assertTrue(tuples.get(6).getLong("level").equals(2L)); |
| |
| cache.close(); |
| |
| } |
| |
| @Test |
| public void testGraphHandler() throws Exception { |
| |
| |
| new UpdateRequest() |
| .add(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim") |
| .add(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam") |
| .add(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max") |
| .add(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip") |
| .add(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve") |
| .add(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve") |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| commit(); |
| |
| List<JettySolrRunner> runners = cluster.getJettySolrRunners(); |
| JettySolrRunner runner = runners.get(0); |
| String url = runner.getBaseUrl().toString(); |
| |
| HttpSolrClient client = getHttpSolrClient(url); |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| |
| |
| String expr = "sort(by=\"node asc\", gatherNodes(collection1, " + |
| "walk=\"bill->from_s\"," + |
| "trackTraversal=\"true\"," + |
| "gather=\"to_s\"))"; |
| |
| params.add("expr", expr); |
| QueryRequest query = new QueryRequest(params); |
| query.setPath("/collection1/graph"); |
| |
| query.setResponseParser(new InputStreamResponseParser("xml")); |
| query.setMethod(SolrRequest.METHOD.POST); |
| |
| NamedList<Object> genericResponse = client.request(query); |
| |
| |
| InputStream stream = (InputStream)genericResponse.get("stream"); |
| InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8); |
| String xml = readString(reader); |
| //Validate the nodes |
| String error = BaseTestHarness.validateXPath(xml, |
| "//graph/node[1][@id ='jim']", |
| "//graph/node[2][@id ='max']", |
| "//graph/node[3][@id ='sam']"); |
| if(error != null) { |
| throw new Exception(error); |
| } |
| //Validate the edges |
| error = BaseTestHarness.validateXPath(xml, |
| "//graph/edge[1][@source ='bill']", |
| "//graph/edge[1][@target ='jim']", |
| "//graph/edge[2][@source ='bill']", |
| "//graph/edge[2][@target ='max']", |
| "//graph/edge[3][@source ='bill']", |
| "//graph/edge[3][@target ='sam']"); |
| |
| if(error != null) { |
| throw new Exception(error); |
| } |
| |
| client.close(); |
| } |
| |
| |
| |
| |
| |
| |
| private String readString(InputStreamReader reader) throws Exception{ |
| StringBuilder builder = new StringBuilder(); |
| int c = 0; |
| while((c = reader.read()) != -1) { |
| builder.append(((char)c)); |
| } |
| |
| return builder.toString(); |
| } |
| |
| |
| |
| |
| protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException { |
| tupleStream.open(); |
| List<Tuple> tuples = new ArrayList<>(); |
| for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { |
| tuples.add(t); |
| } |
| tupleStream.close(); |
| return tuples; |
| } |
| |
| protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception { |
| int i = 0; |
| for(int val : ids) { |
| Tuple t = tuples.get(i); |
| Long tip = (Long)t.get(fieldName); |
| if(tip.intValue() != val) { |
| throw new Exception("Found value:"+tip.intValue()+" expecting:"+val); |
| } |
| ++i; |
| } |
| return true; |
| } |
| |
| public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception { |
| long lv = (long)tuple.get(fieldName); |
| if(lv != l) { |
| throw new Exception("Longs not equal:"+l+" : "+lv); |
| } |
| |
| return true; |
| } |
| |
| public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception { |
| String actual = (String)tuple.get(fieldName); |
| |
| if( (null == expected && null != actual) || |
| (null != expected && null == actual) || |
| (null != expected && !expected.equals(actual))){ |
| throw new Exception("Longs not equal:"+expected+" : "+actual); |
| } |
| |
| |
| |
| return true; |
| } |
| |
| } |