| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.client.solrj.io.stream; |
| |
| import java.io.*; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.zip.GZIPOutputStream; |
| |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.LuceneTestCase.Slow; |
| import org.apache.solr.SolrTestCaseJ4; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.embedded.JettySolrRunner; |
| import org.apache.solr.client.solrj.io.ClassificationEvaluation; |
| import org.apache.solr.client.solrj.io.SolrClientCache; |
| import org.apache.solr.client.solrj.io.Tuple; |
| import org.apache.solr.client.solrj.io.comp.ComparatorOrder; |
| import org.apache.solr.client.solrj.io.comp.FieldComparator; |
| import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; |
| import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; |
| import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; |
| import org.apache.solr.client.solrj.io.stream.metrics.*; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.cloud.SolrCloudTestCase; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.core.CoreDescriptor; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| @Slow |
| @SolrTestCaseJ4.SuppressSSL |
| @LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"}) |
| public class StreamExpressionTest extends SolrCloudTestCase { |
| |
| private static final String COLLECTIONORALIAS = "collection1"; |
| private static final String FILESTREAM_COLLECTION = "filestream_collection"; |
| private static final int TIMEOUT = DEFAULT_TIMEOUT; |
| private static final String id = "id"; |
| |
| private static boolean useAlias; |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| configureCluster(4) |
| .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf")) |
| .addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf")) |
| .configure(); |
| |
| String collection; |
| useAlias = random().nextBoolean(); |
| if (useAlias) { |
| collection = COLLECTIONORALIAS + "_collection"; |
| } else { |
| collection = COLLECTIONORALIAS; |
| } |
| |
| CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection(collection, 2, 2); |
| if (useAlias) { |
| CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient()); |
| } |
| |
| // Create a collection for use by the filestream() expression, and place some files there for it to read. |
| CollectionAdminRequest.createCollection(FILESTREAM_COLLECTION, "conf", 1, 1).process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection(FILESTREAM_COLLECTION, 1, 1); |
| final Path dataDir = findUserFilesDataDir(); |
| populateFileStreamData(dataDir); |
| } |
| |
| @Before |
| public void cleanIndex() throws Exception { |
| new UpdateRequest() |
| .deleteByQuery("*:*") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| } |
| |
| @Test |
| public void testCloudSolrStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") |
| .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamFactory factory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()); |
| StreamExpression expression; |
| CloudSolrStream stream; |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| |
| try { |
| // Basic test |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "a_i", 0); |
| |
| // Basic w/aliases |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "alias.a_i", 0); |
| assertString(tuples.get(0), "name", "hello0"); |
| |
| // Basic filtered test |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 3); |
| assertOrder(tuples, 0, 3, 4); |
| assertLong(tuples.get(1), "a_i", 3); |
| |
| try { |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| throw new Exception("Should be an exception here"); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains("q param expected for search function")); |
| } |
| |
| try { |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| throw new Exception("Should be an exception here"); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains("fl param expected for search function")); |
| } |
| |
| try { |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"blah\", fl=\"id, a_f\", sort=\"a_f\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| throw new Exception("Should be an exception here"); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains("Invalid sort spec")); |
| } |
| |
| // Test with shards param |
| |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| |
| Map<String, List<String>> shardsMap = new HashMap<>(); |
| shardsMap.put("myCollection", shardUrls); |
| StreamContext context = new StreamContext(); |
| context.put("shards", shardsMap); |
| context.setSolrClientCache(solrClientCache); |
| |
| // Basic test |
| expression = StreamExpressionParser.parse("search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "a_i", 0); |
| |
| |
| //Execersise the /stream hander |
| |
| //Add the shards http parameter for the myCollection |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "search(myCollection, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| solrParams.add("myCollection.shards", buf.toString()); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "a_i", 0); |
| |
| } finally { |
| solrClientCache.close(); |
| } |
| } |
| |
| @Test |
| public void testSearchFacadeStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") |
| .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| |
| try { |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "sort(search("+COLLECTIONORALIAS+"), by=\"a_i asc\")"); |
| SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 1, 2, 3, 4); |
| assertLong(tuples.get(0), "a_i", 0); |
| assertDouble(tuples.get(0), "a_f", 0); |
| assertString(tuples.get(0), "a_s", "hello0"); |
| |
| assertLong(tuples.get(1), "a_i", 1); |
| assertDouble(tuples.get(1), "a_f", 1); |
| assertString(tuples.get(1), "a_s", "hello1"); |
| |
| assertLong(tuples.get(2), "a_i", 2); |
| assertDouble(tuples.get(2), "a_f", 0); |
| assertString(tuples.get(2), "a_s", "hello2"); |
| |
| assertLong(tuples.get(3), "a_i", 3); |
| assertDouble(tuples.get(3), "a_f", 3); |
| assertString(tuples.get(3), "a_s", "hello3"); |
| |
| assertLong(tuples.get(4), "a_i", 4); |
| assertDouble(tuples.get(4), "a_f", 4); |
| assertString(tuples.get(4), "a_s", "hello4"); |
| |
| } finally { |
| solrClientCache.close(); |
| } |
| } |
| |
| |
| @Test |
| public void testSqlStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") |
| .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| |
| try { |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "sql("+COLLECTIONORALIAS+", stmt=\"select id from collection1 order by a_i asc\")"); |
| SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 1, 2, 3, 4); |
| |
| //Test with using the default collection |
| solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "sql(stmt=\"select id from collection1 order by a_i asc\")"); |
| solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 1, 2, 3, 4); |
| |
| } finally { |
| solrClientCache.close(); |
| } |
| } |
| |
| @Test |
| public void testCloudSolrStreamWithZkHost() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") |
| .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamFactory factory = new StreamFactory(); |
| StreamExpression expression; |
| CloudSolrStream stream; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| List<Tuple> tuples; |
| |
| try { |
| // Basic test |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", zkHost=" + cluster.getZkServer().getZkAddress() + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "a_i", 0); |
| |
| // Basic w/aliases |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\", zkHost=" + cluster.getZkServer().getZkAddress() + ")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| assertLong(tuples.get(0), "alias.a_i", 0); |
| assertString(tuples.get(0), "name", "hello0"); |
| |
| // Basic filtered test |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost=" |
| + cluster.getZkServer().getZkAddress() + ", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 3); |
| assertOrder(tuples, 0, 3, 4); |
| assertLong(tuples.get(1), "a_i", 3); |
| |
| |
| // Test a couple of multile field lists. |
| expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " + |
| "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0); |
| |
| |
| expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " + |
| "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2); |
| |
| } finally { |
| solrClientCache.close(); |
| } |
| |
| } |
| |
| @Test |
| public void testParameterSubstitution() throws Exception { |
| String oldVal = System.getProperty("StreamingExpressionMacros", "false"); |
| System.setProperty("StreamingExpressionMacros", "true"); |
| try { |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") |
| .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; |
| List<Tuple> tuples; |
| TupleStream stream; |
| |
| // Basic test |
| ModifiableSolrParams sParams = new ModifiableSolrParams(); |
| sParams.set("expr", "merge(" |
| + "${q1}," |
| + "${q2}," |
| + "on=${mySort})"); |
| sParams.set(CommonParams.QT, "/stream"); |
| sParams.set("q1", "search(" + COLLECTIONORALIAS + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})"); |
| sParams.set("q2", "search(" + COLLECTIONORALIAS + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})"); |
| sParams.set("mySort", "a_f asc"); |
| stream = new SolrStream(url, sParams); |
| tuples = getTuples(stream); |
| |
| assertEquals(4, tuples.size()); |
| assertOrder(tuples, 0, 1, 3, 4); |
| |
| // Basic test desc |
| sParams.set("mySort", "a_f desc"); |
| stream = new SolrStream(url, sParams); |
| tuples = getTuples(stream); |
| |
| assertEquals(4, tuples.size()); |
| assertOrder(tuples, 4, 3, 1, 0); |
| |
| // Basic w/ multi comp |
| sParams.set("q2", "search(" + COLLECTIONORALIAS + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})"); |
| sParams.set("mySort", "\"a_f asc, a_s asc\""); |
| stream = new SolrStream(url, sParams); |
| tuples = getTuples(stream); |
| |
| assertEquals(5, tuples.size()); |
| assertOrder(tuples, 0, 2, 1, 3, 4); |
| } finally { |
| System.setProperty("StreamingExpressionMacros", oldVal); |
| } |
| } |
| |
| |
| @Test |
| public void testNulls() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_i", "1", "a_f", "0", "s_multi", "aaa", "s_multi", "bbb", "i_multi", "100", "i_multi", "200") |
| .add(id, "2", "a_s", "hello2", "a_i", "3", "a_f", "0") |
| .add(id, "3", "a_s", "hello3", "a_i", "4", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_f", "4") |
| .add(id, "1", "a_s", "hello1", "a_i", "2", "a_f", "1") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| Tuple tuple; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) |
| .withFunctionName("search", CloudSolrStream.class); |
| try { |
| // Basic test |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 4, 0, 1, 2, 3); |
| |
| tuple = tuples.get(0); |
| assertTrue("hello4".equals(tuple.getString("a_s"))); |
| assertNull(tuple.get("s_multi")); |
| assertNull(tuple.get("i_multi")); |
| assertNull(tuple.getLong("a_i")); |
| |
| |
| tuple = tuples.get(1); |
| assertNull(tuple.get("a_s")); |
| List<String> strings = tuple.getStrings("s_multi"); |
| assertNotNull(strings); |
| assertEquals("aaa", strings.get(0)); |
| assertEquals("bbb", strings.get(1)); |
| List<Long> longs = tuple.getLongs("i_multi"); |
| assertNotNull(longs); |
| |
| //test sort (asc) with null string field. Null should sort to the top. |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 0, 1, 2, 3, 4); |
| |
| //test sort(desc) with null string field. Null should sort to the bottom. |
| expression = StreamExpressionParser.parse("search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f, s_multi, i_multi\", qt=\"/export\", sort=\"a_s desc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 5); |
| assertOrder(tuples, 4, 3, 2, 1, 0); |
| } finally { |
| solrClientCache.close(); |
| } |
| } |
| |
| |
| |
| @Test |
| public void testRandomStream() throws Exception { |
| |
| UpdateRequest update = new UpdateRequest(); |
| for(int idx = 0; idx < 1000; ++idx){ |
| String idxString = Integer.toString(idx); |
| update.add(id,idxString, "a_s", "hello" + idxString, "a_i", idxString, "a_f", idxString); |
| } |
| update.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) |
| .withFunctionName("random", RandomFacadeStream.class); |
| |
| |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| try { |
| context.setSolrClientCache(cache); |
| |
| expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")"); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(context); |
| List<Tuple> tuples1 = getTuples(stream); |
| assert (tuples1.size() == 1000); |
| |
| expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")"); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(context); |
| List<Tuple> tuples2 = getTuples(stream); |
| assert (tuples2.size() == 1000); |
| |
| |
| boolean different = false; |
| for (int i = 0; i < tuples1.size(); i++) { |
| Tuple tuple1 = tuples1.get(i); |
| Tuple tuple2 = tuples2.get(i); |
| if (!tuple1.get("id").equals(tuple2.get(id))) { |
| different = true; |
| break; |
| } |
| } |
| |
| assertTrue(different); |
| |
| Collections.sort(tuples1, new FieldComparator("id", ComparatorOrder.ASCENDING)); |
| Collections.sort(tuples2, new FieldComparator("id", ComparatorOrder.ASCENDING)); |
| |
| for (int i = 0; i < tuples1.size(); i++) { |
| Tuple tuple1 = tuples1.get(i); |
| Tuple tuple2 = tuples2.get(i); |
| if (!tuple1.get("id").equals(tuple2.get(id))) { |
| assert(tuple1.getLong("id").equals(tuple2.get("a_i"))); |
| } |
| } |
| |
| expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")"); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(context); |
| List<Tuple> tuples3 = getTuples(stream); |
| assert (tuples3.size() == 1); |
| |
| //Exercise the DeepRandomStream with higher rows |
| |
| expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"10001\", fl=\"id, a_i\")"); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(context); |
| List<Tuple> tuples10 = getTuples(stream); |
| assert (tuples10.size() == 1000); |
| |
| expression = StreamExpressionParser.parse("random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"10001\", fl=\"id, a_i\")"); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(context); |
| List<Tuple> tuples11 = getTuples(stream); |
| assert (tuples11.size() == 1000); |
| |
| different = false; |
| for (int i = 0; i < tuples10.size(); i++) { |
| Tuple tuple1 = tuples10.get(i); |
| Tuple tuple2 = tuples11.get(i); |
| if (!tuple1.get("id").equals(tuple2.get(id))) { |
| different = true; |
| break; |
| } |
| } |
| |
| assertTrue(different); |
| |
| Collections.sort(tuples10, new FieldComparator("id", ComparatorOrder.ASCENDING)); |
| Collections.sort(tuples11, new FieldComparator("id", ComparatorOrder.ASCENDING)); |
| |
| for (int i = 0; i < tuples10.size(); i++) { |
| Tuple tuple1 = tuples10.get(i); |
| Tuple tuple2 = tuples11.get(i); |
| if (!tuple1.get("id").equals(tuple2.get(id))) { |
| assert(tuple1.getLong("id").equals(tuple2.get("a_i"))); |
| } |
| } |
| |
| |
| |
| //Exercise the /stream handler |
| ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "random(" + COLLECTIONORALIAS + ", q=\"*:*\", rows=\"1\", fl=\"id, a_i\")"); |
| JettySolrRunner jetty = cluster.getJettySolrRunner(0); |
| SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| List<Tuple> tuples4 = getTuples(solrStream); |
| assert (tuples4.size() == 1); |
| //Assert no x-axis |
| assertNull(tuples4.get(0).get("x")); |
| |
| |
| sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "random(" + COLLECTIONORALIAS + ")"); |
| jetty = cluster.getJettySolrRunner(0); |
| solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| tuples4 = getTuples(solrStream); |
| assert(tuples4.size() == 500); |
| @SuppressWarnings({"rawtypes"}) |
| Map fields = tuples4.get(0).getFields(); |
| assert(fields.containsKey("id")); |
| assert(fields.containsKey("a_f")); |
| assert(fields.containsKey("a_i")); |
| assert(fields.containsKey("a_s")); |
| //Assert the x-axis: |
| for(int i=0; i<tuples4.size(); i++) { |
| assertEquals(tuples4.get(i).getLong("x").longValue(), i); |
| } |
| |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| @Test |
| public void testKnnSearchStream() throws Exception { |
| |
| UpdateRequest update = new UpdateRequest(); |
| update.add(id, "1", "a_t", "hello world have a very nice day blah"); |
| update.add(id, "4", "a_t", "hello world have a very streaming is fun"); |
| update.add(id, "3", "a_t", "hello world have a very nice bug out"); |
| update.add(id, "2", "a_t", "hello world have a very nice day fancy sky"); |
| update.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamContext context = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| try { |
| context.setSolrClientCache(cache); |
| ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", rows=\"4\", fl=\"id, score\", mintf=\"1\")"); |
| JettySolrRunner jetty = cluster.getJettySolrRunner(0); |
| SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 3); |
| assertOrder(tuples, 2, 3, 4); |
| |
| sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", k=\"2\", fl=\"id, score\", mintf=\"1\")"); |
| solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 2); |
| assertOrder(tuples, 2, 3); |
| |
| sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", rows=\"4\", fl=\"id, score\", mintf=\"1\", maxdf=\"0\")"); |
| solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 0); |
| |
| sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", rows=\"4\", fl=\"id, score\", mintf=\"1\", maxwl=\"1\")"); |
| solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 0); |
| |
| sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream")); |
| sParams.add("expr", "knnSearch(" + COLLECTIONORALIAS + ", id=\"1\", qf=\"a_t\", rows=\"2\", fl=\"id, score\", mintf=\"1\", minwl=\"20\")"); |
| solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 0); |
| |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| @Test |
| public void testStatsStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") |
| .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") |
| .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6") |
| .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7") |
| .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8") |
| .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") |
| .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) |
| .withFunctionName("stats", StatsStream.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("std", StdMetric.class) |
| .withFunctionName("per", PercentileMetric.class) |
| .withFunctionName("countDist", CountDistinctMetric.class); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| try { |
| streamContext.setSolrClientCache(cache); |
| String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), std(a_i), std(a_f), per(a_i, 50), per(a_f, 50), countDist(a_s), count(*))"; |
| expression = StreamExpressionParser.parse(expr); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(streamContext); |
| |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 1); |
| |
| //Test Long and Double Sums |
| |
| Tuple tuple = tuples.get(0); |
| |
| Double sumi = tuple.getDouble("sum(a_i)"); |
| Double sumf = tuple.getDouble("sum(a_f)"); |
| Double mini = tuple.getDouble("min(a_i)"); |
| Double minf = tuple.getDouble("min(a_f)"); |
| Double maxi = tuple.getDouble("max(a_i)"); |
| Double maxf = tuple.getDouble("max(a_f)"); |
| Double avgi = tuple.getDouble("avg(a_i)"); |
| Double avgf = tuple.getDouble("avg(a_f)"); |
| Double stdi = tuple.getDouble("std(a_i)"); |
| Double stdf = tuple.getDouble("std(a_f)"); |
| Double peri = tuple.getDouble("per(a_i,50)"); |
| Double perf = tuple.getDouble("per(a_f,50)"); |
| Double count = tuple.getDouble("count(*)"); |
| Long countDist = tuple.getLong("countDist(a_s)"); |
| |
| |
| assertTrue(sumi.longValue() == 70); |
| assertTrue(sumf.doubleValue() == 55.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 7.0D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(stdi.doubleValue() == 5.196152422706632); |
| assertTrue(stdf.doubleValue() == 2.8722813232690143); |
| assertTrue(peri.doubleValue() == 7.0D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 10); |
| assertEquals(countDist.longValue(), 3L); |
| |
| |
| |
| //Test without query |
| |
| expr = "stats(" + COLLECTIONORALIAS + ", sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), std(a_i), std(a_f), per(a_i, 50), per(a_f, 50), count(*))"; |
| expression = StreamExpressionParser.parse(expr); |
| stream = factory.constructStream(expression); |
| stream.setStreamContext(streamContext); |
| |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 1); |
| |
| //Test Long and Double Sums |
| |
| tuple = tuples.get(0); |
| |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(sumi.longValue() == 70); |
| assertTrue(sumf.doubleValue() == 55.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 7.0D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(stdi.doubleValue() == 5.196152422706632); |
| assertTrue(stdf.doubleValue() == 2.8722813232690143); |
| assertTrue(peri.doubleValue() == 7.0D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 10); |
| |
| //Test with shards parameter |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| expr = "stats(myCollection, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), std(a_i), std(a_f), per(a_i, 50), per(a_f, 50), count(*))"; |
| Map<String, List<String>> shardsMap = new HashMap<>(); |
| shardsMap.put("myCollection", shardUrls); |
| StreamContext context = new StreamContext(); |
| context.put("shards", shardsMap); |
| context.setSolrClientCache(cache); |
| stream = factory.constructStream(expr); |
| stream.setStreamContext(context); |
| |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 1); |
| |
| //Test Long and Double Sums |
| |
| tuple = tuples.get(0); |
| |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(sumi.longValue() == 70); |
| assertTrue(sumf.doubleValue() == 55.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 7.0D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(stdi.doubleValue() == 5.196152422706632); |
| assertTrue(stdf.doubleValue() == 2.8722813232690143); |
| assertTrue(peri.doubleValue() == 7.0D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 10); |
| |
| //Execersise the /stream hander |
| |
| //Add the shards http parameter for the myCollection |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", expr); |
| solrParams.add("myCollection.shards", buf.toString()); |
| SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 1); |
| |
| tuple =tuples.get(0); |
| |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(sumi.longValue() == 70); |
| assertTrue(sumf.doubleValue() == 55.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 7.0D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 10); |
| //Add a negative test to prove that it cannot find slices if shards parameter is removed |
| |
| try { |
| ModifiableSolrParams solrParamsBad = new ModifiableSolrParams(); |
| solrParamsBad.add("qt", "/stream"); |
| solrParamsBad.add("expr", expr); |
| solrStream = new SolrStream(shardUrls.get(0), solrParamsBad); |
| tuples = getTuples(solrStream); |
| throw new Exception("Exception should have been thrown above"); |
| } catch (IOException e) { |
| assertTrue(e.getMessage().contains("Collection not found: myCollection")); |
| } |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| @Test |
| public void testFacet2DStream() throws Exception { |
| new UpdateRequest() |
| .add(id, "0", "diseases_s", "stroke", "symptoms_s", "confusion", "cases_i", "10") |
| .add(id, "1", "diseases_s", "cancer", "symptoms_s", "indigestion","cases_i", "5" ) |
| .add(id, "2", "diseases_s", "diabetes", "symptoms_s", "thirsty", "cases_i", "20") |
| .add(id, "3", "diseases_s", "stroke", "symptoms_s", "confusion", "cases_i", "10") |
| .add(id, "4", "diseases_s", "bronchus", "symptoms_s", "nausea", "cases_i", "25") |
| .add(id, "5", "diseases_s", "bronchus", "symptoms_s", "cough", "cases_i", "10") |
| .add(id, "6", "diseases_s", "bronchus", "symptoms_s", "cough", "cases_i", "10") |
| .add(id, "7", "diseases_s", "heart attack", "symptoms_s", "indigestion", "cases_i", "5") |
| .add(id, "8", "diseases_s", "diabetes", "symptoms_s", "urination", "cases_i", "10") |
| .add(id, "9", "diseases_s", "diabetes", "symptoms_s", "thirsty", "cases_i", "20") |
| .add(id, "10", "diseases_s", "diabetes", "symptoms_s", "thirsty", "cases_i", "20") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| String expr = "facet2D(collection1, q=\"*:*\", x=\"diseases_s\", y=\"symptoms_s\", dimensions=\"3,1\", count(*))"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| assertEquals(tuples.size(), 3); |
| |
| Tuple tuple1 = tuples.get(0); |
| assertEquals(tuple1.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple1.getString("symptoms_s"), "thirsty"); |
| assertEquals(tuple1.getLong("count(*)").longValue(), 3); |
| |
| Tuple tuple2 = tuples.get(1); |
| assertEquals(tuple2.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple2.getString("symptoms_s"), "cough"); |
| assertEquals(tuple2.getLong("count(*)").longValue(), 2); |
| |
| Tuple tuple3 = tuples.get(2); |
| assertEquals(tuple3.getString("diseases_s"), "stroke"); |
| assertEquals(tuple3.getString("symptoms_s"), "confusion"); |
| assertEquals(tuple3.getLong("count(*)").longValue(), 2); |
| |
| |
| paramsLoc = new ModifiableSolrParams(); |
| expr = "facet2D(collection1, x=\"diseases_s\", y=\"symptoms_s\", dimensions=\"3,1\")"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| assertEquals(tuples.size(), 3); |
| |
| tuple1 = tuples.get(0); |
| assertEquals(tuple1.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple1.getString("symptoms_s"), "thirsty"); |
| assertEquals(tuple1.getString("count(*)"), "3"); |
| |
| tuple2 = tuples.get(1); |
| assertEquals(tuple2.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple2.getString("symptoms_s"), "cough"); |
| assertEquals(tuple2.getString("count(*)"), "2"); |
| |
| tuple3 = tuples.get(2); |
| assertEquals(tuple3.getString("diseases_s"), "stroke"); |
| assertEquals(tuple3.getString("symptoms_s"), "confusion"); |
| assertEquals(tuple3.getString("count(*)"), "2"); |
| |
| paramsLoc = new ModifiableSolrParams(); |
| expr = "facet2D(collection1, q=\"*:*\", x=\"diseases_s\", y=\"symptoms_s\", dimensions=\"3,1\", sum(cases_i))"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| assertEquals(tuples.size(), 3); |
| |
| |
| tuple1 = tuples.get(0); |
| assertEquals(tuple1.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple1.getString("symptoms_s"), "thirsty"); |
| assertEquals(tuple1.getLong("sum(cases_i)").longValue(), 60L); |
| |
| tuple2 = tuples.get(1); |
| assertEquals(tuple2.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple2.getString("symptoms_s"), "nausea"); |
| assertEquals(tuple2.getLong("sum(cases_i)").longValue(), 25L); |
| |
| |
| tuple3 = tuples.get(2); |
| assertEquals(tuple3.getString("diseases_s"), "stroke"); |
| assertEquals(tuple3.getString("symptoms_s"), "confusion"); |
| assertEquals(tuple3.getLong("sum(cases_i)").longValue(), 20L); |
| |
| |
| paramsLoc = new ModifiableSolrParams(); |
| expr = "facet2D(collection1, q=\"*:*\", x=\"diseases_s\", y=\"symptoms_s\", dimensions=\"3,1\", avg(cases_i))"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| assertEquals(tuples.size(), 3); |
| |
| tuple1 = tuples.get(0); |
| assertEquals(tuple1.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple1.getString("symptoms_s"), "thirsty"); |
| assertEquals(tuple1.getLong("avg(cases_i)").longValue(), 20); |
| |
| tuple2 = tuples.get(1); |
| assertEquals(tuple2.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple2.getString("symptoms_s"), "nausea"); |
| assertEquals(tuple2.getLong("avg(cases_i)").longValue(), 25); |
| |
| tuple3 = tuples.get(2); |
| assertEquals(tuple3.getString("diseases_s"), "stroke"); |
| assertEquals(tuple3.getString("symptoms_s"), "confusion"); |
| assertEquals(tuple3.getLong("avg(cases_i)").longValue(), 10); |
| |
| paramsLoc = new ModifiableSolrParams(); |
| expr = "facet2D(collection1, q=\"*:*\", x=\"diseases_s\", y=\"symptoms_s\", dimensions=\"2,2\")"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| assertEquals(tuples.size(), 4); |
| |
| tuple1 = tuples.get(0); |
| assertEquals(tuple1.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple1.getString("symptoms_s"), "thirsty"); |
| assertEquals(tuple1.getLong("count(*)").longValue(), 3); |
| |
| tuple2 = tuples.get(1); |
| assertEquals(tuple2.getString("diseases_s"), "diabetes"); |
| assertEquals(tuple2.getString("symptoms_s"), "urination"); |
| assertEquals(tuple2.getLong("count(*)").longValue(), 1); |
| |
| |
| tuple3 = tuples.get(2); |
| assertEquals(tuple3.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple3.getString("symptoms_s"), "cough"); |
| assertEquals(tuple3.getLong("count(*)").longValue(), 2); |
| |
| Tuple tuple4 = tuples.get(3); |
| assertEquals(tuple4.getString("diseases_s"), "bronchus"); |
| assertEquals(tuple4.getString("symptoms_s"), "nausea"); |
| assertEquals(tuple4.getLong("count(*)").longValue(), 1); |
| } |
| |
| |
| @Test |
| public void testDrillStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") |
| .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") |
| .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6") |
| .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7") |
| .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8") |
| .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") |
| .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| List<Tuple> tuples; |
| |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| String expr = "rollup(select(drill(" |
| + " collection1, " |
| + " q=\"*:*\", " |
| + " fl=\"a_s, a_f\", " |
| + " sort=\"a_s desc\", " |
| + " rollup(input(), over=\"a_s\", count(*), sum(a_f)))," + |
| " a_s, count(*) as cnt, sum(a_f) as saf)," + |
| " over=\"a_s\"," + |
| " sum(cnt), sum(saf)" |
| + ")"; |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| |
| Tuple tuple = tuples.get(0); |
| String bucket = tuple.getString("a_s"); |
| |
| Double count = tuple.getDouble("sum(cnt)"); |
| Double saf = tuple.getDouble("sum(saf)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertEquals(count.doubleValue(), 2, 0); |
| assertEquals(saf.doubleValue(), 11, 0); |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| count = tuple.getDouble("sum(cnt)"); |
| saf = tuple.getDouble("sum(saf)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertEquals(count.doubleValue(), 4, 0); |
| assertEquals(saf.doubleValue(), 26, 0); |
| |
| tuple = tuples.get(2); |
| bucket = tuple.getString("a_s"); |
| count = tuple.getDouble("sum(cnt)"); |
| saf = tuple.getDouble("sum(saf)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(count.doubleValue() == 4); |
| assertEquals(saf.doubleValue(), 18, 0); |
| |
| } |
| |
| @Test |
| public void testFacetStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") |
| .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") |
| .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") |
| .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6") |
| .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7") |
| .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8") |
| .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") |
| .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String clause; |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("facet", FacetStream.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("std", StdMetric.class) |
| .withFunctionName("per", PercentileMetric.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("countDist", CountDistinctMetric.class); |
| |
| // Basic test |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"sum(a_i) asc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "std(a_i), std(a_f)," |
| + "per(a_i, 50), per(a_f, 50)," |
| + "count(*), countDist(a_i)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 3); |
| |
| |
| Tuple tuple = tuples.get(0); |
| String bucket = tuple.getString("a_s"); |
| Double sumi = tuple.getDouble("sum(a_i)"); |
| Double sumf = tuple.getDouble("sum(a_f)"); |
| Double mini = tuple.getDouble("min(a_i)"); |
| Double minf = tuple.getDouble("min(a_f)"); |
| Double maxi = tuple.getDouble("max(a_i)"); |
| Double maxf = tuple.getDouble("max(a_f)"); |
| Double avgi = tuple.getDouble("avg(a_i)"); |
| Double avgf = tuple.getDouble("avg(a_f)"); |
| Double stdi = tuple.getDouble("std(a_i)"); |
| Double stdf = tuple.getDouble("std(a_f)"); |
| Double peri = tuple.getDouble("per(a_i,50)"); |
| Double perf = tuple.getDouble("per(a_f,50)"); |
| Long countDist = tuple.getLong("countDist(a_i)"); |
| Double count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(sumf.doubleValue() == 11.0D); |
| assertTrue(mini.doubleValue() == 4.0D); |
| assertTrue(minf.doubleValue() == 4.0D); |
| assertTrue(maxi.doubleValue() == 11.0D); |
| assertTrue(maxf.doubleValue() == 7.0D); |
| assertTrue(avgi.doubleValue() == 7.5D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 2); |
| assertTrue(stdi.doubleValue() == 3.5D); |
| assertTrue(stdf.doubleValue() == 1.5D); |
| assertTrue(peri.doubleValue() == 7.5D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| assertEquals(countDist.longValue(), 2); |
| |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| countDist = tuple.getLong("countDist(a_i)"); |
| |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 5.673402858955108D); |
| assertTrue(stdf.doubleValue() == 3.5D); |
| assertTrue(peri.doubleValue() == 1.5D); |
| assertTrue(perf.doubleValue() == 3.5D); |
| assertEquals(countDist.longValue(), 4); |
| |
| |
| tuple = tuples.get(2); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| countDist = tuple.getLong("countDist(a_i)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertTrue(sumi.doubleValue() == 38.0D); |
| assertTrue(sumf.doubleValue() == 26.0D); |
| assertTrue(mini.doubleValue() == 3.0D); |
| assertTrue(minf.doubleValue() == 3.0D); |
| assertTrue(maxi.doubleValue() == 13.0D); |
| assertTrue(maxf.doubleValue() == 9.0D); |
| assertTrue(avgi.doubleValue() == 9.5D); |
| assertTrue(avgf.doubleValue() == 6.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 3.905124837953327D); |
| assertTrue(stdf.doubleValue() == 2.29128784747792D); |
| assertTrue(peri.doubleValue() == 11.0D); |
| assertTrue(perf.doubleValue() == 7.0D); |
| assertEquals(countDist.longValue(), 4); |
| |
| //Reverse the Sort. |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"sum(a_i) desc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "std(a_i), std(a_f)," |
| + "per(a_i, 50), per(a_f, 50)," |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| |
| //Test Long and Double Sums |
| |
| tuple = tuples.get(0); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertTrue(sumi.doubleValue() == 38.0D); |
| assertTrue(sumf.doubleValue() == 26.0D); |
| assertTrue(mini.doubleValue() == 3.0D); |
| assertTrue(minf.doubleValue() == 3.0D); |
| assertTrue(maxi.doubleValue() == 13.0D); |
| assertTrue(maxf.doubleValue() == 9.0D); |
| assertTrue(avgi.doubleValue() == 9.5D); |
| assertTrue(avgf.doubleValue() == 6.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 3.905124837953327D); |
| assertTrue(stdf.doubleValue() == 2.29128784747792D); |
| assertTrue(peri.doubleValue() == 11.0D); |
| assertTrue(perf.doubleValue() == 7.0D); |
| |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 5.673402858955108D); |
| assertTrue(stdf.doubleValue() == 3.5D); |
| assertTrue(peri.doubleValue() == 1.5D); |
| assertTrue(perf.doubleValue() == 3.5D); |
| |
| tuple = tuples.get(2); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(sumf.doubleValue() == 11.0D); |
| assertTrue(mini.doubleValue() == 4.0D); |
| assertTrue(minf.doubleValue() == 4.0D); |
| assertTrue(maxi.doubleValue() == 11.0D); |
| assertTrue(maxf.doubleValue() == 7.0D); |
| assertTrue(avgi.doubleValue() == 7.5D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 2); |
| assertTrue(stdi.doubleValue() == 3.5D); |
| assertTrue(stdf.doubleValue() == 1.5D); |
| assertTrue(peri.doubleValue() == 7.5D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"sum(a_i) desc\", " |
| + "rows=2, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| |
| //Test rows |
| |
| tuple = tuples.get(0); |
| assertEquals(tuples.size(), 2); |
| |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertTrue(sumi.doubleValue() == 38.0D); |
| assertTrue(sumf.doubleValue() == 26.0D); |
| assertTrue(mini.doubleValue() == 3.0D); |
| assertTrue(minf.doubleValue() == 3.0D); |
| assertTrue(maxi.doubleValue() == 13.0D); |
| assertTrue(maxf.doubleValue() == 9.0D); |
| assertTrue(avgi.doubleValue() == 9.5D); |
| assertTrue(avgf.doubleValue() == 6.5D); |
| assertTrue(count.doubleValue() == 4); |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"sum(a_i) desc\", " |
| + "rows=2, offset=1, method=dvhash, refine=true," |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| |
| //Test offset |
| |
| tuple = tuples.get(0); |
| assertEquals(tuples.size(), 2); |
| |
| tuple = tuples.get(0); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(sumf.doubleValue() == 11.0D); |
| assertTrue(mini.doubleValue() == 4.0D); |
| assertTrue(minf.doubleValue() == 4.0D); |
| assertTrue(maxi.doubleValue() == 11.0D); |
| assertTrue(maxf.doubleValue() == 7.0D); |
| assertTrue(avgi.doubleValue() == 7.5D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 2); |
| |
| |
| //Test index sort |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"a_s desc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "std(a_i), std(a_f)," |
| + "per(a_i, 50), per(a_f, 50)," |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 3); |
| |
| |
| tuple = tuples.get(0); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(sumf.doubleValue() == 11.0D); |
| assertTrue(mini.doubleValue() == 4.0D); |
| assertTrue(minf.doubleValue() == 4.0D); |
| assertTrue(maxi.doubleValue() == 11.0D); |
| assertTrue(maxf.doubleValue() == 7.0D); |
| assertTrue(avgi.doubleValue() == 7.5D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 2); |
| assertTrue(stdi.doubleValue() == 3.5D); |
| assertTrue(stdf.doubleValue() == 1.5D); |
| assertTrue(peri.doubleValue() == 7.5D); |
| assertTrue(perf.doubleValue() == 5.5D); |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertTrue(sumi.doubleValue() == 38.0D); |
| assertTrue(sumf.doubleValue() == 26.0D); |
| assertTrue(mini.doubleValue() == 3.0D); |
| assertTrue(minf.doubleValue() == 3.0D); |
| assertTrue(maxi.doubleValue() == 13.0D); |
| assertTrue(maxf.doubleValue() == 9.0D); |
| assertTrue(avgi.doubleValue() == 9.5D); |
| assertTrue(avgf.doubleValue() == 6.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 3.905124837953327D); |
| assertTrue(stdf.doubleValue() == 2.29128784747792D); |
| assertTrue(peri.doubleValue() == 11.0D); |
| assertTrue(perf.doubleValue() == 7.0D); |
| |
| |
| tuple = tuples.get(2); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| stdi = tuple.getDouble("std(a_i)"); |
| stdf = tuple.getDouble("std(a_f)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| perf = tuple.getDouble("per(a_f,50)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| assertTrue(stdi.doubleValue() == 5.673402858955108D); |
| assertTrue(stdf.doubleValue() == 3.5D); |
| assertTrue(peri.doubleValue() == 1.5D); |
| assertTrue(perf.doubleValue() == 3.5D); |
| |
| |
| //Test index sort |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"a_s asc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 3); |
| |
| |
| tuple = tuples.get(0); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello0")); |
| assertTrue(sumi.doubleValue() == 17.0D); |
| assertTrue(sumf.doubleValue() == 18.0D); |
| assertTrue(mini.doubleValue() == 0.0D); |
| assertTrue(minf.doubleValue() == 1.0D); |
| assertTrue(maxi.doubleValue() == 14.0D); |
| assertTrue(maxf.doubleValue() == 10.0D); |
| assertTrue(avgi.doubleValue() == 4.25D); |
| assertTrue(avgf.doubleValue() == 4.5D); |
| assertTrue(count.doubleValue() == 4); |
| |
| |
| tuple = tuples.get(1); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello3")); |
| assertTrue(sumi.doubleValue() == 38.0D); |
| assertTrue(sumf.doubleValue() == 26.0D); |
| assertTrue(mini.doubleValue() == 3.0D); |
| assertTrue(minf.doubleValue() == 3.0D); |
| assertTrue(maxi.doubleValue() == 13.0D); |
| assertTrue(maxf.doubleValue() == 9.0D); |
| assertTrue(avgi.doubleValue() == 9.5D); |
| assertTrue(avgf.doubleValue() == 6.5D); |
| assertTrue(count.doubleValue() == 4); |
| |
| |
| tuple = tuples.get(2); |
| bucket = tuple.getString("a_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| sumf = tuple.getDouble("sum(a_f)"); |
| mini = tuple.getDouble("min(a_i)"); |
| minf = tuple.getDouble("min(a_f)"); |
| maxi = tuple.getDouble("max(a_i)"); |
| maxf = tuple.getDouble("max(a_f)"); |
| avgi = tuple.getDouble("avg(a_i)"); |
| avgf = tuple.getDouble("avg(a_f)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket.equals("hello4")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(sumf.doubleValue() == 11.0D); |
| assertTrue(mini.doubleValue() == 4.0D); |
| assertTrue(minf.doubleValue() == 4.0D); |
| assertTrue(maxi.doubleValue() == 11.0D); |
| assertTrue(maxf.doubleValue() == 7.0D); |
| assertTrue(avgi.doubleValue() == 7.5D); |
| assertTrue(avgf.doubleValue() == 5.5D); |
| assertTrue(count.doubleValue() == 2); |
| |
| //Test zero result facets |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"blahhh\", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"a_s asc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 0); |
| |
| } |
| |
| @Test |
| public void testMultiCollection() throws Exception { |
| |
| CollectionAdminRequest.createCollection("collection2", "conf", 2, 1).process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection("collection2", 2, 2); |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7") |
| .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77") |
| .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777") |
| .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777") |
| .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777") |
| .commit(cluster.getSolrClient(), "collection1"); |
| |
| new UpdateRequest() |
| .add(id, "10", "a_s", "hello", "a_i", "10", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7") |
| .add(id, "12", "a_s", "hello", "a_i", "12", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77") |
| .add(id, "13", "a_s", "hello", "a_i", "13", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777") |
| .add(id, "14", "a_s", "hello", "a_i", "14", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777") |
| .add(id, "11", "a_s", "hello", "a_i", "11", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777") |
| .commit(cluster.getSolrClient(), "collection2"); |
| |
| |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| |
| try { |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", rows=50, sort=\"a_i asc\")"); |
| SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 10); |
| assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14); |
| |
| //Test with export handler, different code path. |
| |
| solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\")"); |
| solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 10); |
| assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14); |
| |
| |
| solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "facet(\"collection1, collection2\", q=\"*:*\", buckets=\"a_s\", bucketSorts=\"count(*) asc\", count(*))"); |
| solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 1); |
| Tuple tuple = tuples.get(0); |
| assertEquals(tuple.getString("a_s"), "hello"); |
| assertEquals(tuple.getLong("count(*)").longValue(), 10); |
| |
| String expr = "timeseries(\"collection1, collection2\", q=\"*:*\", " + |
| "start=\"2016-01-01T01:00:00.000Z\", " + |
| "end=\"2016-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "format=\"yyyy\","+ |
| "count(*))"; |
| |
| solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", expr); |
| solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 1); |
| tuple = tuples.get(0); |
| assertEquals(tuple.getString("test_dt"), "2016"); |
| assertEquals(tuple.getLong("count(*)").longValue(), 10); |
| |
| //Test parallel |
| |
| solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", "parallel(collection1, sort=\"a_i asc\", workers=2, search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\", partitionKeys=\"a_s\"))"); |
| solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| solrStream.setStreamContext(streamContext); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 10); |
| assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14); |
| |
| } finally { |
| CollectionAdminRequest.deleteCollection("collection2").process(cluster.getSolrClient()); |
| solrClientCache.close(); |
| } |
| |
| |
| } |
| |
| @Test |
| public void testSubFacetStream() throws Exception { |
| |
| new UpdateRequest() |
| .add(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1") |
| .add(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2") |
| .add(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3") |
| .add(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4") |
| .add(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5") |
| .add(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6") |
| .add(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7") |
| .add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8") |
| .add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9") |
| .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String clause; |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("facet", FacetStream.class) |
| .withFunctionName("sum", SumMetric.class) |
| .withFunctionName("min", MinMetric.class) |
| .withFunctionName("max", MaxMetric.class) |
| .withFunctionName("avg", MeanMetric.class) |
| .withFunctionName("count", CountMetric.class) |
| .withFunctionName("std", StdMetric.class) |
| .withFunctionName("per", PercentileMetric.class); |
| |
| // Basic test |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "buckets=\"level1_s, level2_s\", " |
| + "bucketSorts=\"sum(a_i) desc, sum(a_i) desc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 6); |
| |
| Tuple tuple = tuples.get(0); |
| String bucket1 = tuple.getString("level1_s"); |
| String bucket2 = tuple.getString("level2_s"); |
| Double sumi = tuple.getDouble("sum(a_i)"); |
| Double count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 35); |
| assertTrue(count.doubleValue() == 3); |
| |
| tuple = tuples.get(1); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(count.doubleValue() == 2); |
| |
| tuple = tuples.get(2); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 11); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(3); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 4); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(4); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 3); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(5); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 2); |
| assertTrue(count.doubleValue() == 2); |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "buckets=\"level1_s, level2_s\", " |
| + "bucketSorts=\"level1_s desc, level2_s desc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), count(*)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 6); |
| |
| tuple = tuples.get(0); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 11); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(1); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 4); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(2); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 35); |
| assertTrue(count.doubleValue() == 3); |
| |
| tuple = tuples.get(3); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 3); |
| assertTrue(count.doubleValue() == 1); |
| |
| tuple = tuples.get(4); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(sumi.longValue() == 15); |
| assertTrue(count.doubleValue() == 2); |
| |
| tuple = tuples.get(5); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| sumi = tuple.getDouble("sum(a_i)"); |
| count = tuple.getDouble("count(*)"); |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(sumi.longValue() == 2); |
| assertTrue(count.doubleValue() == 2); |
| |
| //Add sorts for percentile |
| |
| clause = "facet(" |
| + "collection1, " |
| + "q=\"*:*\", " |
| + "buckets=\"level1_s, level2_s\", " |
| + "bucketSorts=\"per(a_i, 50) desc, std(a_i) desc\", " |
| + "bucketSizeLimit=100, " |
| + "std(a_i), per(a_i,50)" |
| + ")"; |
| |
| stream = factory.constructStream(clause); |
| tuples = getTuples(stream); |
| |
| assert(tuples.size() == 6); |
| |
| tuple = tuples.get(0); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| double stdi = tuple.getDouble("std(a_i)"); |
| double peri = tuple.getDouble("per(a_i,50)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(stdi == 1.2472191289246535D); |
| assertTrue(peri == 12.0D); |
| |
| tuple = tuples.get(1); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| stdi = tuple.getDouble("std(a_i)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(stdi == 0.0D); |
| assertTrue(peri == 11.0); |
| |
| tuple = tuples.get(2); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| stdi = tuple.getDouble("std(a_i)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("b")); |
| assertTrue(stdi == 6.5D); |
| assertTrue(peri == 7.5D); |
| |
| tuple = tuples.get(3); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| stdi = tuple.getDouble("std(a_i)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| |
| assertTrue(bucket1.equals("hello4")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(stdi == 0.0D); |
| assertTrue(peri == 4.0D); |
| |
| tuple = tuples.get(4); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| stdi = tuple.getDouble("std(a_i)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| |
| assertTrue(bucket1.equals("hello3")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(stdi == 0.0D); |
| assertTrue(peri == 3.0D); |
| |
| tuple = tuples.get(5); |
| bucket1 = tuple.getString("level1_s"); |
| bucket2 = tuple.getString("level2_s"); |
| stdi = tuple.getDouble("std(a_i)"); |
| peri = tuple.getDouble("per(a_i,50)"); |
| |
| assertTrue(bucket1.equals("hello0")); |
| assertTrue(bucket2.equals("a")); |
| assertTrue(stdi == 1.0D); |
| assertTrue(peri == 1.0D); |
| |
| } |
| |
| @Test |
| public void testTopicStream() throws Exception { |
| Assume.assumeTrue(!useAlias); |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1") |
| .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2") |
| .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3") |
| .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4") |
| .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5") |
| .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") |
| .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") |
| .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") |
| .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("topic", TopicStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("daemon", DaemonStream.class); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| SolrClientCache cache = new SolrClientCache(); |
| |
| try { |
| //Store checkpoints in the same index as the main documents. This perfectly valid |
| expression = StreamExpressionParser.parse("topic(collection1, collection1, q=\"a_s:hello\", fl=\"id\", id=\"1000000\", checkpointEvery=3)"); |
| |
| stream = factory.constructStream(expression); |
| StreamContext context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| //Should be zero because the checkpoints will be set to the highest vesion on the shards. |
| assertEquals(tuples.size(), 0); |
| |
| cluster.getSolrClient().commit("collection1"); |
| //Now check to see if the checkpoints are present |
| |
| expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| assertEquals(tuples.size(), 1); |
| List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss"); |
| assertEquals(checkpoints.size(), 2); |
| Long version1 = tuples.get(0).getLong("_version_"); |
| |
| //Index a few more documents |
| new UpdateRequest() |
| .add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| expression = StreamExpressionParser.parse("topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2)"); |
| |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| try { |
| stream.open(); |
| Tuple tuple1 = stream.read(); |
| assertEquals((long) tuple1.getLong("id"), 10l); |
| cluster.getSolrClient().commit("collection1"); |
| |
| // Checkpoint should not have changed. |
| expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); |
| TupleStream cstream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| cstream.setStreamContext(context); |
| tuples = getTuples(cstream); |
| |
| assertEquals(tuples.size(), 1); |
| checkpoints = tuples.get(0).getStrings("checkpoint_ss"); |
| assertEquals(checkpoints.size(), 2); |
| Long version2 = tuples.get(0).getLong("_version_"); |
| assertEquals(version1, version2); |
| |
| Tuple tuple2 = stream.read(); |
| cluster.getSolrClient().commit("collection1"); |
| assertEquals((long) tuple2.getLong("id"), 11l); |
| |
| //Checkpoint should have changed. |
| expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); |
| cstream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| cstream.setStreamContext(context); |
| tuples = getTuples(cstream); |
| |
| assertEquals(tuples.size(), 1); |
| checkpoints = tuples.get(0).getStrings("checkpoint_ss"); |
| assertEquals(checkpoints.size(), 2); |
| Long version3 = tuples.get(0).getLong("_version_"); |
| assertTrue(version3 > version2); |
| |
| Tuple tuple3 = stream.read(); |
| assertTrue(tuple3.EOF); |
| } finally { |
| stream.close(); |
| } |
| |
| //Test with the DaemonStream |
| |
| DaemonStream dstream = null; |
| try { |
| expression = StreamExpressionParser.parse("daemon(topic(collection1, collection1, fl=\"id\", q=\"a_s:hello\", id=\"1000000\", checkpointEvery=2), id=\"test\", runInterval=\"1000\", queueSize=\"9\")"); |
| dstream = (DaemonStream) factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| dstream.setStreamContext(context); |
| |
| //Index a few more documents |
| new UpdateRequest() |
| .add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| //Start reading from the DaemonStream |
| Tuple tuple = null; |
| |
| dstream.open(); |
| tuple = dstream.read(); |
| assertEquals(12, (long) tuple.getLong(id)); |
| tuple = dstream.read(); |
| assertEquals(13, (long) tuple.getLong(id)); |
| cluster.getSolrClient().commit("collection1"); // We want to see if the version has been updated after reading two tuples |
| |
| //Index a few more documents |
| new UpdateRequest() |
| .add(id, "14", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "15", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| //Read from the same DaemonStream stream |
| |
| tuple = dstream.read(); |
| assertEquals(14, (long) tuple.getLong(id)); |
| tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream. |
| assertEquals(15, (long) tuple.getLong(id)); |
| |
| dstream.shutdown(); |
| tuple = dstream.read(); |
| assertTrue(tuple.EOF); |
| } finally { |
| dstream.close(); |
| } |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| |
| @Test |
| // commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018 |
| public void testParallelTopicStream() throws Exception { |
| |
| Assume.assumeTrue(!useAlias); |
| |
| new UpdateRequest() |
| .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1", "subject", "ha ha bla blah0") |
| .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2", "subject", "ha ha bla blah2") |
| .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "subject", "ha ha bla blah3") |
| .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "subject", "ha ha bla blah4") |
| .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5", "subject", "ha ha bla blah5") |
| .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6", "subject", "ha ha bla blah6") |
| .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7", "subject", "ha ha bla blah7") |
| .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8", "subject", "ha ha bla blah8") |
| .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9", "subject", "ha ha bla blah9") |
| .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10", "subject", "ha ha bla blah10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("topic", TopicStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("parallel", ParallelStream.class) |
| .withFunctionName("daemon", DaemonStream.class); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| SolrClientCache cache = new SolrClientCache(); |
| |
| try { |
| //Store checkpoints in the same index as the main documents. This is perfectly valid |
| expression = StreamExpressionParser.parse("parallel(collection1, " + |
| "workers=\"2\", " + |
| "sort=\"_version_ asc\"," + |
| "topic(collection1, " + |
| "collection1, " + |
| "q=\"a_s:hello\", " + |
| "fl=\"id\", " + |
| "id=\"1000000\", " + |
| "partitionKeys=\"id\"))"); |
| |
| stream = factory.constructStream(expression); |
| StreamContext context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| //Should be zero because the checkpoints will be set to the highest version on the shards. |
| assertEquals(tuples.size(), 0); |
| |
| cluster.getSolrClient().commit("collection1"); |
| //Now check to see if the checkpoints are present |
| |
| expression = StreamExpressionParser.parse("search(collection1, q=\"id:1000000*\", fl=\"id, checkpoint_ss, _version_\", sort=\"id asc\")"); |
| |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| assertEquals(tuples.size(), 2); |
| List<String> checkpoints = tuples.get(0).getStrings("checkpoint_ss"); |
| assertEquals(checkpoints.size(), 2); |
| String id1 = tuples.get(0).getString("id"); |
| String id2 = tuples.get(1).getString("id"); |
| assertTrue(id1.equals("1000000_0")); |
| assertTrue(id2.equals("1000000_1")); |
| |
| //Index a few more documents |
| new UpdateRequest() |
| .add(id, "10", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "11", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| expression = StreamExpressionParser.parse("parallel(collection1, " + |
| "workers=\"2\", " + |
| "sort=\"_version_ asc\"," + |
| "topic(collection1, " + |
| "collection1, " + |
| "q=\"a_s:hello\", " + |
| "fl=\"id\", " + |
| "id=\"1000000\", " + |
| "partitionKeys=\"id\"))"); |
| |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| assertTopicRun(stream, "10", "11"); |
| |
| //Test will initial checkpoint. This should pull all |
| |
| expression = StreamExpressionParser.parse("parallel(collection1, " + |
| "workers=\"2\", " + |
| "sort=\"_version_ asc\"," + |
| "topic(collection1, " + |
| "collection1, " + |
| "q=\"a_s:hello\", " + |
| "fl=\"id\", " + |
| "id=\"2000000\", " + |
| "initialCheckpoint=\"0\", " + |
| "partitionKeys=\"id\"))"); |
| |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| assertTopicRun(stream, "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); |
| |
| //Add more documents |
| //Index a few more documents |
| new UpdateRequest() |
| .add(id, "12", "a_s", "hello", "a_i", "13", "a_f", "9") |
| .add(id, "13", "a_s", "hello", "a_i", "14", "a_f", "10") |
| .commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| //Run the same topic again including the initialCheckpoint. It should start where it left off. |
| //initialCheckpoint should be ignored for all but the first run. |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| assertTopicRun(stream, "12", "13"); |
| |
| //Test text extraction |
| |
| expression = StreamExpressionParser.parse("parallel(collection1, " + |
| "workers=\"2\", " + |
| "sort=\"_version_ asc\"," + |
| "topic(collection1, " + |
| "collection1, " + |
| "q=\"subject:bla\", " + |
| "fl=\"subject\", " + |
| "id=\"3000000\", " + |
| "initialCheckpoint=\"0\", " + |
| "partitionKeys=\"id\"))"); |
| |
| stream = factory.constructStream(expression); |
| context = new StreamContext(); |
| context.setSolrClientCache(cache); |
| stream.setStreamContext(context); |
| |
| assertTopicSubject(stream, "ha ha bla blah0", |
| "ha ha bla blah1", |
| "ha ha bla blah2", |
| "ha ha bla blah3", |
| "ha ha bla blah4", |
| "ha ha bla blah5", |
| "ha ha bla blah6", |
| "ha ha bla blah7", |
| "ha ha bla blah8", |
| "ha ha bla blah9", |
| "ha ha bla blah10"); |
| |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| |
| |
| |
| |
| @Test |
| public void testEchoStream() throws Exception { |
| String expr = "echo(hello world)"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| String s = (String)tuples.get(0).get("echo"); |
| assertTrue(s.equals("hello world")); |
| |
| expr = "echo(\"hello world\")"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| s = (String)tuples.get(0).get("echo"); |
| assertTrue(s.equals("hello world")); |
| |
| expr = "echo(\"hello, world\")"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| s = (String)tuples.get(0).get("echo"); |
| assertTrue(s.equals("hello, world")); |
| |
| expr = "echo(\"hello, \\\"t\\\" world\")"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| s = (String)tuples.get(0).get("echo"); |
| |
| assertTrue(s.equals("hello, \"t\" world")); |
| |
| expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"echo asc\", echo(\"hello, \\\"t\\\" world\"))"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 2); |
| s = (String)tuples.get(0).get("echo"); |
| assertTrue(s.equals("hello, \"t\" world")); |
| s = (String)tuples.get(1).get("echo"); |
| assertTrue(s.equals("hello, \"t\" world")); |
| |
| expr = "echo(\"tuytuy iuyiuyi iuyiuyiu iuyiuyiuyiu iuyi iuyiyiuy iuyiuyiu iyiuyiu iyiuyiuyyiyiu yiuyiuyi" + |
| " yiuyiuyi yiuyiuuyiu yiyiuyiyiu iyiuyiuyiuiuyiu yiuyiuyi yiuyiy yiuiyiuiuy\")"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| s = (String)tuples.get(0).get("echo"); |
| |
| assertTrue(s.equals("tuytuy iuyiuyi iuyiuyiu iuyiuyiuyiu iuyi iuyiyiuy iuyiuyiu iyiuyiu iyiuyiuyyiyiu yiuyiuyi yiuyiuyi " + |
| "yiuyiuuyiu yiyiuyiyiu iyiuyiuyiuiuyiu yiuyiuyi yiuyiy yiuiyiuiuy")); |
| } |
| |
| @Test |
| public void testEvalStream() throws Exception { |
| UpdateRequest updateRequest = new UpdateRequest(); |
| updateRequest.add(id, "hello", "test_t", "l b c d c"); |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String expr = "eval(select(echo(\"search("+COLLECTIONORALIAS+", q=\\\"*:*\\\", fl=id, sort=\\\"id desc\\\")\"), echo as expr_s))"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| String s = (String)tuples.get(0).get("id"); |
| assertTrue(s.equals("hello")); |
| } |
| |
| private String getDateString(String year, String month, String day) { |
| return year+"-"+month+"-"+day+"T00:00:00Z"; |
| } |
| |
| @Test |
| public void testTimeSeriesStream() throws Exception { |
| UpdateRequest updateRequest = new UpdateRequest(); |
| |
| int i=0; |
| while(i<50) { |
| updateRequest.add(id, "id_"+(++i),"test_dt", getDateString("2016", "5", "1"), "price_f", "400.00"); |
| } |
| |
| while(i<100) { |
| updateRequest.add(id, "id_"+(++i),"test_dt", getDateString("2015", "5", "1"), "price_f", "300.0"); |
| } |
| |
| while(i<150) { |
| updateRequest.add(id, "id_"+(++i),"test_dt", getDateString("2014", "5", "1"), "price_f", "500.0"); |
| } |
| |
| while(i<250) { |
| updateRequest.add(id, "id_"+(++i),"test_dt", getDateString("2013", "5", "1"), "price_f", "100.00"); |
| } |
| |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " + |
| "end=\"2017-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50), countDist(id))"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| |
| assertTrue(tuples.size() == 5); |
| |
| assertTrue(tuples.get(0).get("test_dt").equals("2013-01-01T01:00:00Z")); |
| assertTrue(tuples.get(0).getLong("count(*)").equals(100L)); |
| assertTrue(tuples.get(0).getDouble("sum(price_f)").equals(10000D)); |
| assertTrue(tuples.get(0).getDouble("max(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("min(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("avg(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(0).getDouble("per(price_f,50)").equals(100D)); |
| assertTrue(tuples.get(0).getLong("countDist(id)").equals(100L)); |
| |
| |
| assertTrue(tuples.get(1).get("test_dt").equals("2014-01-01T01:00:00Z")); |
| assertTrue(tuples.get(1).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(25000D)); |
| assertTrue(tuples.get(1).getDouble("max(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("min(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(1).getDouble("per(price_f,50)").equals(500D)); |
| assertTrue(tuples.get(1).getLong("countDist(id)").equals(50L)); |
| |
| |
| assertTrue(tuples.get(2).get("test_dt").equals("2015-01-01T01:00:00Z")); |
| assertTrue(tuples.get(2).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(15000D)); |
| assertTrue(tuples.get(2).getDouble("max(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("min(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("avg(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(2).getDouble("per(price_f,50)").equals(300D)); |
| assertTrue(tuples.get(2).getLong("countDist(id)").equals(50L)); |
| |
| |
| assertTrue(tuples.get(3).get("test_dt").equals("2016-01-01T01:00:00Z")); |
| assertTrue(tuples.get(3).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(20000D)); |
| assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("avg(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(3).getDouble("per(price_f,50)").equals(400D)); |
| assertTrue(tuples.get(3).getLong("countDist(id)").equals(50L)); |
| |
| |
| assertTrue(tuples.get(4).get("test_dt").equals("2017-01-01T01:00:00Z")); |
| assertEquals((long)tuples.get(4).getLong("count(*)"), 0L); |
| assertEquals(tuples.get(4).getDouble("sum(price_f)"), 0D, 0); |
| assertEquals(tuples.get(4).getDouble("max(price_f)"),0D, 0); |
| assertEquals(tuples.get(4).getDouble("min(price_f)"), 0D, 0); |
| assertTrue(tuples.get(4).getDouble("avg(price_f)").equals(0D)); |
| assertTrue(tuples.get(4).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(4).getDouble("per(price_f,50)").equals(0D)); |
| assertTrue(tuples.get(4).getLong("countDist(id)").equals(0L)); |
| |
| expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " + |
| "end=\"2016-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "format=\"yyyy\", " + |
| "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50))"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 4); |
| |
| assertTrue(tuples.get(0).get("test_dt").equals("2013")); |
| assertTrue(tuples.get(0).getLong("count(*)").equals(100L)); |
| assertTrue(tuples.get(0).getDouble("sum(price_f)").equals(10000D)); |
| assertTrue(tuples.get(0).getDouble("max(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("min(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("avg(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(0).getDouble("per(price_f,50)").equals(100D)); |
| |
| assertTrue(tuples.get(1).get("test_dt").equals("2014")); |
| assertTrue(tuples.get(1).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(25000D)); |
| assertTrue(tuples.get(1).getDouble("max(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("min(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(1).getDouble("per(price_f,50)").equals(500D)); |
| |
| |
| assertTrue(tuples.get(2).get("test_dt").equals("2015")); |
| assertTrue(tuples.get(2).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(15000D)); |
| assertTrue(tuples.get(2).getDouble("max(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("min(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("avg(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(2).getDouble("per(price_f,50)").equals(300D)); |
| |
| assertTrue(tuples.get(3).get("test_dt").equals("2016")); |
| assertTrue(tuples.get(3).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(20000D)); |
| assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("avg(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(3).getDouble("per(price_f,50)").equals(400D)); |
| |
| |
| expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " + |
| "end=\"2016-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "format=\"yyyy-MM\", " + |
| "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50))"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 4); |
| |
| assertTrue(tuples.get(0).get("test_dt").equals("2013-01")); |
| assertTrue(tuples.get(0).getLong("count(*)").equals(100L)); |
| assertTrue(tuples.get(0).getDouble("sum(price_f)").equals(10000D)); |
| assertTrue(tuples.get(0).getDouble("max(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("min(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("avg(price_f)").equals(100D)); |
| assertTrue(tuples.get(0).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(0).getDouble("per(price_f,50)").equals(100D)); |
| |
| assertTrue(tuples.get(1).get("test_dt").equals("2014-01")); |
| assertTrue(tuples.get(1).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(25000D)); |
| assertTrue(tuples.get(1).getDouble("max(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("min(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(500D)); |
| assertTrue(tuples.get(1).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(1).getDouble("per(price_f,50)").equals(500D)); |
| |
| assertTrue(tuples.get(2).get("test_dt").equals("2015-01")); |
| assertTrue(tuples.get(2).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(15000D)); |
| assertTrue(tuples.get(2).getDouble("max(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("min(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("avg(price_f)").equals(300D)); |
| assertTrue(tuples.get(2).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(2).getDouble("per(price_f,50)").equals(300D)); |
| |
| assertTrue(tuples.get(3).get("test_dt").equals("2016-01")); |
| assertTrue(tuples.get(3).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(20000D)); |
| assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("avg(price_f)").equals(400D)); |
| assertTrue(tuples.get(3).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(3).getDouble("per(price_f,50)").equals(400D)); |
| |
| |
| expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2012-01-01T01:00:00.000Z\", " + |
| "end=\"2016-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "format=\"yyyy-MM\", " + |
| "count(*), sum(price_f), max(price_f), min(price_f), avg(price_f), std(price_f), per(price_f, 50))"; |
| paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| solrStream = new SolrStream(url, paramsLoc); |
| |
| solrStream.setStreamContext(context); |
| tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 5); |
| assertTrue(tuples.get(0).get("test_dt").equals("2012-01")); |
| assertTrue(tuples.get(0).getLong("count(*)").equals(0L)); |
| assertTrue(tuples.get(0).getDouble("sum(price_f)") == 0); |
| assertTrue(tuples.get(0).getDouble("max(price_f)") == 0); |
| assertTrue(tuples.get(0).getDouble("min(price_f)") == 0); |
| assertTrue(tuples.get(0).getDouble("avg(price_f)").equals(0D)); |
| assertTrue(tuples.get(0).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(0).getDouble("per(price_f,50)").equals(0D)); |
| |
| assertTrue(tuples.get(1).get("test_dt").equals("2013-01")); |
| assertTrue(tuples.get(1).getLong("count(*)").equals(100L)); |
| assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(10000D)); |
| assertTrue(tuples.get(1).getDouble("max(price_f)").equals(100D)); |
| assertTrue(tuples.get(1).getDouble("min(price_f)").equals(100D)); |
| assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(100D)); |
| assertTrue(tuples.get(1).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(1).getDouble("per(price_f,50)").equals(100D)); |
| |
| assertTrue(tuples.get(2).get("test_dt").equals("2014-01")); |
| assertTrue(tuples.get(2).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(25000D)); |
| assertTrue(tuples.get(2).getDouble("max(price_f)").equals(500D)); |
| assertTrue(tuples.get(2).getDouble("min(price_f)").equals(500D)); |
| assertTrue(tuples.get(2).getDouble("avg(price_f)").equals(500D)); |
| assertTrue(tuples.get(2).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(2).getDouble("per(price_f,50)").equals(500D)); |
| |
| assertTrue(tuples.get(3).get("test_dt").equals("2015-01")); |
| assertTrue(tuples.get(3).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(15000D)); |
| assertTrue(tuples.get(3).getDouble("max(price_f)").equals(300D)); |
| assertTrue(tuples.get(3).getDouble("min(price_f)").equals(300D)); |
| assertTrue(tuples.get(3).getDouble("avg(price_f)").equals(300D)); |
| assertTrue(tuples.get(3).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(3).getDouble("per(price_f,50)").equals(300D)); |
| |
| assertTrue(tuples.get(4).get("test_dt").equals("2016-01")); |
| assertTrue(tuples.get(4).getLong("count(*)").equals(50L)); |
| assertTrue(tuples.get(4).getDouble("sum(price_f)").equals(20000D)); |
| assertTrue(tuples.get(4).getDouble("max(price_f)").equals(400D)); |
| assertTrue(tuples.get(4).getDouble("min(price_f)").equals(400D)); |
| assertTrue(tuples.get(4).getDouble("avg(price_f)").equals(400D)); |
| assertTrue(tuples.get(4).getDouble("std(price_f)").equals(0D)); |
| assertTrue(tuples.get(4).getDouble("per(price_f,50)").equals(400D)); |
| } |
| |
| @Test |
| public void testTupleStream() throws Exception { |
| UpdateRequest updateRequest = new UpdateRequest(); |
| updateRequest.add(id, "hello", "test_t", "l b c d c e"); |
| updateRequest.add(id, "hello1", "test_t", "l b c d c"); |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String expr = "search("+COLLECTIONORALIAS+", q=\"`c d c`\", fl=\"id,test_t\", sort=\"id desc\")"; |
| |
| //Add a Stream and an Evaluator to the Tuple. |
| String cat = "tuple(results="+expr+", sum=add(1,1))"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", cat); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| List<Map> results = (List<Map>)tuples.get(0).get("results"); |
| assertTrue(results.get(0).get("id").equals("hello1")); |
| assertTrue(results.get(0).get("test_t").equals("l b c d c")); |
| assertTrue(results.get(1).get("id").equals("hello")); |
| assertTrue(results.get(1).get("test_t").equals("l b c d c e")); |
| |
| assertTrue(tuples.get(0).getLong("sum").equals(2L)); |
| |
| } |
| |
| @Test |
| public void testSearchBacktick() throws Exception { |
| UpdateRequest updateRequest = new UpdateRequest(); |
| updateRequest.add(id, "hello", "test_t", "l b c d c e"); |
| updateRequest.add(id, "hello1", "test_t", "l b c d c"); |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| String expr = "search("+COLLECTIONORALIAS+", q=\"`c d c e`\", fl=\"id,test_t\", sort=\"id desc\")"; |
| |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertTrue(tuples.size() == 1); |
| Tuple tuple = tuples.get(0); |
| assertTrue(tuple.get("id").equals("hello")); |
| assertTrue(tuple.get("test_t").equals("l b c d c e")); |
| } |
| |
| |
| |
| private Map<String,Double> getIdToLabel(TupleStream stream, String outField) throws IOException { |
| Map<String, Double> idToLabel = new HashMap<>(); |
| List<Tuple> tuples = getTuples(stream); |
| for (Tuple tuple : tuples) { |
| idToLabel.put(tuple.getString("id"), tuple.getDouble(outField)); |
| } |
| return idToLabel; |
| } |
| |
| |
| @Test |
| public void testBasicTextLogitStream() throws Exception { |
| Assume.assumeTrue(!useAlias); |
| |
| CollectionAdminRequest.createCollection("destinationCollection", "ml", 2, 1).process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection("destinationCollection", 2, 2); |
| |
| UpdateRequest updateRequest = new UpdateRequest(); |
| for (int i = 0; i < 5000; i+=2) { |
| updateRequest.add(id, String.valueOf(i), "tv_text", "a b c c d", "out_i", "1"); |
| updateRequest.add(id, String.valueOf(i+1), "tv_text", "a b e e f", "out_i", "0"); |
| } |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("features", FeaturesSelectionStream.class) |
| .withFunctionName("train", TextLogitStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("update", UpdateStream.class); |
| try { |
| expression = StreamExpressionParser.parse("features(collection1, q=\"*:*\", featureSet=\"first\", field=\"tv_text\", outcome=\"out_i\", numTerms=4)"); |
| stream = new FeaturesSelectionStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 4); |
| HashSet<String> terms = new HashSet<>(); |
| for (Tuple tuple : tuples) { |
| terms.add((String) tuple.get("term_s")); |
| } |
| assertTrue(terms.contains("d")); |
| assertTrue(terms.contains("c")); |
| assertTrue(terms.contains("e")); |
| assertTrue(terms.contains("f")); |
| |
| String textLogitExpression = "train(" + |
| "collection1, " + |
| "features(collection1, q=\"*:*\", featureSet=\"first\", field=\"tv_text\", outcome=\"out_i\", numTerms=4)," + |
| "q=\"*:*\", " + |
| "name=\"model\", " + |
| "field=\"tv_text\", " + |
| "outcome=\"out_i\", " + |
| "maxIterations=100)"; |
| stream = factory.constructStream(textLogitExpression); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| Tuple lastTuple = tuples.get(tuples.size() - 1); |
| List<Double> lastWeights = lastTuple.getDoubles("weights_ds"); |
| Double[] lastWeightsArray = lastWeights.toArray(new Double[lastWeights.size()]); |
| |
| // first feature is bias value |
| Double[] testRecord = {1.0, 1.17, 0.691, 0.0, 0.0}; |
| double d = sum(multiply(testRecord, lastWeightsArray)); |
| double prob = sigmoid(d); |
| assertEquals(prob, 1.0, 0.1); |
| |
| // first feature is bias value |
| Double[] testRecord2 = {1.0, 0.0, 0.0, 1.17, 0.691}; |
| d = sum(multiply(testRecord2, lastWeightsArray)); |
| prob = sigmoid(d); |
| assertEquals(prob, 0, 0.1); |
| |
| stream = factory.constructStream("update(destinationCollection, batchSize=5, " + textLogitExpression + ")"); |
| getTuples(stream); |
| cluster.getSolrClient().commit("destinationCollection"); |
| |
| stream = factory.constructStream("search(destinationCollection, " + |
| "q=*:*, " + |
| "fl=\"iteration_i,* \", " + |
| "rows=100, " + |
| "sort=\"iteration_i desc\")"); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| assertEquals(100, tuples.size()); |
| Tuple lastModel = tuples.get(0); |
| ClassificationEvaluation evaluation = ClassificationEvaluation.create(lastModel.getFields()); |
| assertTrue(evaluation.getF1() >= 1.0); |
| assertEquals(Math.log(5000.0 / (2500 + 1)), lastModel.getDoubles("idfs_ds").get(0), 0.0001); |
| // make sure the tuples is retrieved in correct order |
| Tuple firstTuple = tuples.get(99); |
| assertEquals(1L, (long) firstTuple.getLong("iteration_i")); |
| } finally { |
| CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient()); |
| solrClientCache.close(); |
| } |
| } |
| |
| private double sigmoid(double in) { |
| |
| double d = 1.0 / (1+Math.exp(-in)); |
| return d; |
| } |
| |
| private double[] multiply(Double[] vec1, Double[] vec2) { |
| double[] working = new double[vec1.length]; |
| for(int i=0; i<vec1.length; i++) { |
| working[i]= vec1[i]*vec2[i]; |
| } |
| |
| return working; |
| } |
| |
| private double sum(double[] vec) { |
| double d = 0.0; |
| |
| for(double v : vec) { |
| d += v; |
| } |
| |
| return d; |
| } |
| |
| @Test |
| public void testFeaturesSelectionStream() throws Exception { |
| Assume.assumeTrue(!useAlias); |
| |
| CollectionAdminRequest.createCollection("destinationCollection", "ml", 2, 1).process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection("destinationCollection", 2, 2); |
| |
| UpdateRequest updateRequest = new UpdateRequest(); |
| for (int i = 0; i < 5000; i+=2) { |
| updateRequest.add(id, String.valueOf(i), "whitetok", "a b c d", "out_i", "1"); |
| updateRequest.add(id, String.valueOf(i+1), "whitetok", "a b e f", "out_i", "0"); |
| } |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| StreamExpression expression; |
| TupleStream stream; |
| List<Tuple> tuples; |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache solrClientCache = new SolrClientCache(); |
| streamContext.setSolrClientCache(solrClientCache); |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress()) |
| .withFunctionName("featuresSelection", FeaturesSelectionStream.class) |
| .withFunctionName("search", CloudSolrStream.class) |
| .withFunctionName("update", UpdateStream.class); |
| |
| |
| try { |
| String featuresExpression = "featuresSelection(collection1, q=\"*:*\", featureSet=\"first\", field=\"whitetok\", outcome=\"out_i\", numTerms=4)"; |
| // basic |
| expression = StreamExpressionParser.parse(featuresExpression); |
| stream = new FeaturesSelectionStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 4); |
| |
| assertTrue(tuples.get(0).get("term_s").equals("c")); |
| assertTrue(tuples.get(1).get("term_s").equals("d")); |
| assertTrue(tuples.get(2).get("term_s").equals("e")); |
| assertTrue(tuples.get(3).get("term_s").equals("f")); |
| |
| // update |
| expression = StreamExpressionParser.parse("update(destinationCollection, " + featuresExpression + ")"); |
| stream = new UpdateStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| getTuples(stream); |
| cluster.getSolrClient().commit("destinationCollection"); |
| |
| expression = StreamExpressionParser.parse("search(destinationCollection, q=featureSet_s:first, fl=\"index_i, term_s\", sort=\"index_i asc\")"); |
| stream = new CloudSolrStream(expression, factory); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| assertEquals(4, tuples.size()); |
| assertTrue(tuples.get(0).get("term_s").equals("c")); |
| assertTrue(tuples.get(1).get("term_s").equals("d")); |
| assertTrue(tuples.get(2).get("term_s").equals("e")); |
| assertTrue(tuples.get(3).get("term_s").equals("f")); |
| } finally { |
| CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient()); |
| solrClientCache.close(); |
| } |
| } |
| |
| |
| @Test |
| public void testSignificantTermsStream() throws Exception { |
| |
| UpdateRequest updateRequest = new UpdateRequest(); |
| for (int i = 0; i < 5000; i++) { |
| updateRequest.add(id, "a"+i, "test_t", "a b c d m l"); |
| } |
| |
| for (int i = 0; i < 5000; i++) { |
| updateRequest.add(id, "b"+i, "test_t", "a b e f"); |
| } |
| |
| for (int i = 0; i < 900; i++) { |
| updateRequest.add(id, "c"+i, "test_t", "c"); |
| } |
| |
| for (int i = 0; i < 600; i++) { |
| updateRequest.add(id, "d"+i, "test_t", "d"); |
| } |
| |
| for (int i = 0; i < 500; i++) { |
| updateRequest.add(id, "e"+i, "test_t", "m"); |
| } |
| |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| TupleStream stream; |
| List<Tuple> tuples; |
| |
| StreamFactory factory = new StreamFactory() |
| .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) |
| .withDefaultZkHost(cluster.getZkServer().getZkAddress()) |
| .withFunctionName("significantTerms", SignificantTermsStream.class); |
| |
| StreamContext streamContext = new StreamContext(); |
| SolrClientCache cache = new SolrClientCache(); |
| streamContext.setSolrClientCache(cache); |
| try { |
| |
| String significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, minTermLength=1, maxDocFreq=\".5\")"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 3); |
| assertTrue(tuples.get(0).get("term").equals("l")); |
| assertTrue(tuples.get(0).getLong("background") == 5000); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| |
| assertTrue(tuples.get(1).get("term").equals("m")); |
| assertTrue(tuples.get(1).getLong("background") == 5500); |
| assertTrue(tuples.get(1).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(2).get("term").equals("d")); |
| assertTrue(tuples.get(2).getLong("background") == 5600); |
| assertTrue(tuples.get(2).getLong("foreground") == 5000); |
| |
| //Test maxDocFreq |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, maxDocFreq=2650, minTermLength=1)"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 1); |
| assertTrue(tuples.get(0).get("term").equals("l")); |
| assertTrue(tuples.get(0).getLong("background") == 5000); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| //Test maxDocFreq percentage |
| |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, maxDocFreq=\".45\", minTermLength=1)"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| assert (tuples.size() == 1); |
| assertTrue(tuples.get(0).get("term").equals("l")); |
| assertTrue(tuples.get(0).getLong("background") == 5000); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| |
| //Test min doc freq |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 3); |
| |
| assertTrue(tuples.get(0).get("term").equals("m")); |
| assertTrue(tuples.get(0).getLong("background") == 5500); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(1).get("term").equals("d")); |
| assertTrue(tuples.get(1).getLong("background") == 5600); |
| assertTrue(tuples.get(1).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(2).get("term").equals("c")); |
| assertTrue(tuples.get(2).getLong("background") == 5900); |
| assertTrue(tuples.get(2).getLong("foreground") == 5000); |
| |
| |
| //Test min doc freq percent |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=3, minDocFreq=\".478\", minTermLength=1, maxDocFreq=\".5\")"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 1); |
| |
| assertTrue(tuples.get(0).get("term").equals("c")); |
| assertTrue(tuples.get(0).getLong("background") == 5900); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| //Test limit |
| |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=2, minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 2); |
| |
| assertTrue(tuples.get(0).get("term").equals("m")); |
| assertTrue(tuples.get(0).getLong("background") == 5500); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(1).get("term").equals("d")); |
| assertTrue(tuples.get(1).getLong("background") == 5600); |
| assertTrue(tuples.get(1).getLong("foreground") == 5000); |
| |
| //Test term length |
| |
| significantTerms = "significantTerms(collection1, q=\"id:a*\", field=\"test_t\", limit=2, minDocFreq=\"2700\", minTermLength=2)"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(streamContext); |
| tuples = getTuples(stream); |
| assert (tuples.size() == 0); |
| |
| |
| //Test with shards parameter |
| List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); |
| |
| Map<String, List<String>> shardsMap = new HashMap<>(); |
| shardsMap.put("myCollection", shardUrls); |
| StreamContext context = new StreamContext(); |
| context.put("shards", shardsMap); |
| context.setSolrClientCache(cache); |
| significantTerms = "significantTerms(myCollection, q=\"id:a*\", field=\"test_t\", limit=2, minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")"; |
| stream = factory.constructStream(significantTerms); |
| stream.setStreamContext(context); |
| tuples = getTuples(stream); |
| |
| assert (tuples.size() == 2); |
| |
| assertTrue(tuples.get(0).get("term").equals("m")); |
| assertTrue(tuples.get(0).getLong("background") == 5500); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(1).get("term").equals("d")); |
| assertTrue(tuples.get(1).getLong("background") == 5600); |
| assertTrue(tuples.get(1).getLong("foreground") == 5000); |
| |
| //Execersise the /stream hander |
| |
| //Add the shards http parameter for the myCollection |
| StringBuilder buf = new StringBuilder(); |
| for (String shardUrl : shardUrls) { |
| if (buf.length() > 0) { |
| buf.append(","); |
| } |
| buf.append(shardUrl); |
| } |
| |
| ModifiableSolrParams solrParams = new ModifiableSolrParams(); |
| solrParams.add("qt", "/stream"); |
| solrParams.add("expr", significantTerms); |
| solrParams.add("myCollection.shards", buf.toString()); |
| SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); |
| tuples = getTuples(solrStream); |
| assert (tuples.size() == 2); |
| |
| assertTrue(tuples.get(0).get("term").equals("m")); |
| assertTrue(tuples.get(0).getLong("background") == 5500); |
| assertTrue(tuples.get(0).getLong("foreground") == 5000); |
| |
| assertTrue(tuples.get(1).get("term").equals("d")); |
| assertTrue(tuples.get(1).getLong("background") == 5600); |
| assertTrue(tuples.get(1).getLong("foreground") == 5000); |
| |
| //Add a negative test to prove that it cannot find slices if shards parameter is removed |
| |
| try { |
| ModifiableSolrParams solrParamsBad = new ModifiableSolrParams(); |
| solrParamsBad.add("qt", "/stream"); |
| solrParamsBad.add("expr", significantTerms); |
| solrStream = new SolrStream(shardUrls.get(0), solrParamsBad); |
| tuples = getTuples(solrStream); |
| throw new Exception("Exception should have been thrown above"); |
| } catch (IOException e) { |
| assertTrue(e.getMessage().contains("Slices not found for myCollection")); |
| } |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| @Test |
| public void tooLargeForGetRequest() throws IOException, SolrServerException { |
| // Test expressions which are larger than GET can handle |
| UpdateRequest updateRequest = new UpdateRequest(); |
| for (int i = 0; i < 10; i++) { |
| updateRequest.add(id, "a"+i, "test_t", "a b c d m l"); |
| } |
| for(int i=1; i<=50; i++) { |
| updateRequest.add(id, "id_"+(i),"test_dt", getDateString("2016", "5", "1"), "price_f", "400.00"); |
| } |
| updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); |
| |
| SolrClientCache cache = new SolrClientCache(); |
| StreamContext streamContext = new StreamContext(); |
| streamContext.setSolrClientCache(cache); |
| // use filter() to allow being parsed as 'terms in set' query instead of a (weighted/scored) BooleanQuery |
| // so we don't trip too many boolean clauses |
| String longQuery = "\"filter(id:(" + IntStream.range(0, 4000).mapToObj(i -> "a").collect(Collectors.joining(" ", "", "")) + "))\""; |
| |
| try { |
| assertSuccess("significantTerms("+COLLECTIONORALIAS+", q="+longQuery+", field=\"test_t\", limit=3, minTermLength=1, maxDocFreq=\".5\")", streamContext); |
| String expr = "timeseries("+COLLECTIONORALIAS+", q="+longQuery+", start=\"2013-01-01T01:00:00.000Z\", " + |
| "end=\"2016-12-01T01:00:00.000Z\", " + |
| "gap=\"+1YEAR\", " + |
| "field=\"test_dt\", " + |
| "format=\"yyyy\", " + |
| "count(*), sum(price_f), max(price_f), min(price_f))"; |
| assertSuccess(expr, streamContext); |
| expr = "facet(" |
| + "collection1, " |
| + "q="+longQuery+", " |
| + "fl=\"a_s,a_i,a_f\", " |
| + "sort=\"a_s asc\", " |
| + "buckets=\"a_s\", " |
| + "bucketSorts=\"sum(a_i) asc\", " |
| + "bucketSizeLimit=100, " |
| + "sum(a_i), sum(a_f), " |
| + "min(a_i), min(a_f), " |
| + "max(a_i), max(a_f), " |
| + "avg(a_i), avg(a_f), " |
| + "count(*)" |
| + ")"; |
| assertSuccess(expr, streamContext); |
| expr = "stats(" + COLLECTIONORALIAS + ", q="+longQuery+", sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))"; |
| assertSuccess(expr, streamContext); |
| expr = "search(" + COLLECTIONORALIAS + ", q="+longQuery+", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"; |
| assertSuccess(expr, streamContext); |
| expr = "random(" + COLLECTIONORALIAS + ", q="+longQuery+", rows=\"1000\", fl=\"id, a_i\")"; |
| assertSuccess(expr, streamContext); |
| } finally { |
| cache.close(); |
| } |
| } |
| |
| @Test |
| public void testCatStreamSingleFile() throws Exception { |
| final String catStream = "cat(\"topLevel1.txt\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertEquals(4, tuples.size()); |
| |
| for (int i = 0; i < 4; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line")); |
| assertEquals("topLevel1.txt", t.get("file")); |
| } |
| } |
| |
| @Test |
| public void testCatStreamSingleGzipFile() throws Exception { |
| final String catStream = "cat(\"topLevel1.txt.gz\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertEquals(4, tuples.size()); |
| |
| for (int i = 0; i < 4; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("topLevel1.txt.gz line " + String.valueOf(i+1), t.get("line")); |
| assertEquals("topLevel1.txt.gz", t.get("file")); |
| } |
| } |
| |
| @Test |
| public void testCatStreamEmptyFile() throws Exception { |
| final String catStream = "cat(\"topLevel-empty.txt\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| |
| assertEquals(0, tuples.size()); |
| } |
| |
| @Test |
| public void testCatStreamMultipleFilesOneEmpty() throws Exception { |
| final String catStream = "cat(\"topLevel1.txt,topLevel-empty.txt\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| |
| assertEquals(4, tuples.size()); |
| |
| for (int i = 0; i < 4; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line")); |
| assertEquals("topLevel1.txt", t.get("file")); |
| } |
| } |
| |
| @Test |
| public void testCatStreamMaxLines() throws Exception { |
| final String catStream = "cat(\"topLevel1.txt\", maxLines=2)"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertEquals(2, tuples.size()); |
| |
| for (int i = 0; i < 2; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line")); |
| assertEquals("topLevel1.txt", t.get("file")); |
| } |
| } |
| |
| @Test |
| public void testCatStreamDirectoryCrawl() throws Exception { |
| final String catStream = "cat(\"directory1\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertEquals(8, tuples.size()); |
| |
| final String expectedSecondLevel1Path = "directory1" + File.separator + "secondLevel1.txt"; |
| for (int i = 0; i < 4; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("secondLevel1.txt line " + String.valueOf(i+1), t.get("line")); |
| assertEquals(expectedSecondLevel1Path, t.get("file")); |
| } |
| |
| final String expectedSecondLevel2Path = "directory1" + File.separator + "secondLevel2.txt"; |
| for (int i = 4; i < 8; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("secondLevel2.txt line " + String.valueOf(i - 3), t.get("line")); |
| assertEquals(expectedSecondLevel2Path, t.get("file")); |
| } |
| } |
| |
| @Test |
| public void testCatStreamMultipleExplicitFiles() throws Exception { |
| final String catStream = "cat(\"topLevel1.txt,directory1" + File.separator + "secondLevel2.txt\")"; |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", catStream); |
| paramsLoc.set("qt", "/stream"); |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION; |
| |
| SolrStream solrStream = new SolrStream(url, paramsLoc); |
| |
| StreamContext context = new StreamContext(); |
| solrStream.setStreamContext(context); |
| List<Tuple> tuples = getTuples(solrStream); |
| assertEquals(8, tuples.size()); |
| |
| for (int i = 0; i < 4; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line")); |
| assertEquals("topLevel1.txt", t.get("file")); |
| } |
| |
| final String expectedSecondLevel2Path = "directory1" + File.separator + "secondLevel2.txt"; |
| for (int i = 4; i < 8; i++) { |
| Tuple t = tuples.get(i); |
| assertEquals("secondLevel2.txt line " + String.valueOf(i - 3), t.get("line")); |
| assertEquals(expectedSecondLevel2Path, t.get("file")); |
| } |
| } |
| |
| private void assertSuccess(String expr, StreamContext streamContext) throws IOException { |
| ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); |
| paramsLoc.set("expr", expr); |
| paramsLoc.set("qt", "/stream"); |
| |
| String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; |
| TupleStream solrStream = new SolrStream(url, paramsLoc); |
| solrStream.setStreamContext(streamContext); |
| getTuples(solrStream); |
| } |
| |
| |
| private static Path findUserFilesDataDir() { |
| for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { |
| final String baseDir = cluster.getBaseDir().toAbsolutePath().toString(); |
| for (CoreDescriptor coreDescriptor : jetty.getCoreContainer().getCoreDescriptors()) { |
| if (coreDescriptor.getCollectionName().equals(FILESTREAM_COLLECTION)) { |
| return jetty.getCoreContainer().getUserFilesPath(); |
| } |
| } |
| } |
| |
| throw new IllegalStateException("Unable to determine data-dir for: "+ FILESTREAM_COLLECTION); |
| } |
| |
| /** |
| * Creates a tree of files underneath a provided data-directory. |
| * |
| * The filetree created looks like: |
| * |
| * dataDir |
| * |- topLevel1.txt |
| * |- topLevel2.txt |
| * |- topLevel-empty.txt |
| * |- directory1 |
| * |- secondLevel1.txt |
| * |- secondLevel2.txt |
| * |
| * Each file contains 4 lines. Each line looks like: "<filename> line <linenumber>" |
| */ |
| private static void populateFileStreamData(Path dataDir) throws Exception { |
| Files.createDirectories(dataDir); |
| Files.createDirectories(dataDir.resolve("directory1")); |
| |
| populateFileWithGzipData(dataDir.resolve("topLevel1.txt.gz")); |
| populateFileWithData(dataDir.resolve("topLevel1.txt")); |
| populateFileWithData(dataDir.resolve("topLevel2.txt")); |
| Files.createFile(dataDir.resolve("topLevel-empty.txt")); |
| populateFileWithData(dataDir.resolve("directory1").resolve("secondLevel1.txt")); |
| populateFileWithData(dataDir.resolve("directory1").resolve("secondLevel2.txt")); |
| } |
| |
| private static void populateFileWithData(Path dataFile) throws Exception { |
| Files.createFile(dataFile); |
| try (final BufferedWriter writer = Files.newBufferedWriter(dataFile, StandardCharsets.UTF_8)) { |
| for (int i = 1; i <=4; i++) { |
| writer.write(dataFile.getFileName() + " line " + i); |
| writer.newLine(); |
| } |
| } |
| } |
| |
| private static void populateFileWithGzipData(Path dataFile) throws Exception { |
| Files.createFile(dataFile); |
| try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(dataFile.toFile())), StandardCharsets.UTF_8))) { |
| for (int i = 1; i <=4; i++) { |
| writer.write(dataFile.getFileName() + " line " + i); |
| writer.newLine(); |
| } |
| } |
| } |
| |
| protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException { |
| List<Tuple> tuples = new ArrayList<Tuple>(); |
| |
| try { |
| tupleStream.open(); |
| for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { |
| tuples.add(t); |
| } |
| } finally { |
| tupleStream.close(); |
| } |
| return tuples; |
| } |
| protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception { |
| return assertOrderOf(tuples, "id", ids); |
| } |
| protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception { |
| int i = 0; |
| for(int val : ids) { |
| Tuple t = tuples.get(i); |
| String tip = t.getString(fieldName); |
| if(!tip.equals(Integer.toString(val))) { |
| throw new Exception("Found value:"+tip+" expecting:"+val); |
| } |
| ++i; |
| } |
| return true; |
| } |
| |
| public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception { |
| long lv = (long)tuple.get(fieldName); |
| if(lv != l) { |
| throw new Exception("Longs not equal:"+l+" : "+lv); |
| } |
| |
| return true; |
| } |
| |
| public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception { |
| String actual = (String)tuple.get(fieldName); |
| |
| if( (null == expected && null != actual) || |
| (null != expected && null == actual) || |
| (null != expected && !expected.equals(actual))){ |
| throw new Exception("Longs not equal:"+expected+" : "+actual); |
| } |
| |
| return true; |
| } |
| |
| public boolean assertDouble(Tuple tuple, String fieldName, double d) throws Exception { |
| double dv = tuple.getDouble(fieldName); |
| if(dv != d) { |
| throw new Exception("Doubles not equal:"+d+" : "+dv); |
| } |
| |
| return true; |
| } |
| |
| protected boolean assertMaps(@SuppressWarnings({"rawtypes"})List<Map> maps, int... ids) throws Exception { |
| if(maps.size() != ids.length) { |
| throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size()); |
| } |
| |
| int i=0; |
| for(int val : ids) { |
| @SuppressWarnings({"rawtypes"}) |
| Map t = maps.get(i); |
| String tip = (String)t.get("id"); |
| if(!tip.equals(Integer.toString(val))) { |
| throw new Exception("Found value:"+tip+" expecting:"+val); |
| } |
| ++i; |
| } |
| return true; |
| } |
| |
| private void assertTopicRun(TupleStream stream, String... idArray) throws Exception { |
| long version = -1; |
| int count = 0; |
| List<String> ids = new ArrayList<>(); |
| for(String id : idArray) { |
| ids.add(id); |
| } |
| |
| try { |
| stream.open(); |
| while (true) { |
| Tuple tuple = stream.read(); |
| if (tuple.EOF) { |
| break; |
| } else { |
| ++count; |
| String id = tuple.getString("id"); |
| if (!ids.contains(id)) { |
| throw new Exception("Expecting id in topic run not found:" + id); |
| } |
| |
| long v = tuple.getLong("_version_"); |
| if (v < version) { |
| throw new Exception("Out of order version in topic run:" + v); |
| } |
| } |
| } |
| } finally { |
| stream.close(); |
| } |
| |
| if(count != ids.size()) { |
| throw new Exception("Wrong count in topic run:"+count); |
| } |
| } |
| |
| private void assertTopicSubject(TupleStream stream, String... textArray) throws Exception { |
| long version = -1; |
| int count = 0; |
| List<String> texts = new ArrayList<>(); |
| for(String text : textArray) { |
| texts.add(text); |
| } |
| |
| try { |
| stream.open(); |
| while (true) { |
| Tuple tuple = stream.read(); |
| if (tuple.EOF) { |
| break; |
| } else { |
| ++count; |
| String subject = tuple.getString("subject"); |
| if (!texts.contains(subject)) { |
| throw new Exception("Expecting subject in topic run not found:" + subject); |
| } |
| } |
| } |
| } finally { |
| stream.close(); |
| } |
| } |
| } |