blob: 2fa0dd093357cc50d4598b22d56ab5e2fc1da74a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.AddEvaluator;
import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
import org.apache.solr.client.solrj.io.eval.IfThenElseEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
* SolrStream will get fully exercised through these tests.
*
**/
@Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
private static final String COLLECTIONORALIAS = "collection1";
private static final int TIMEOUT = DEFAULT_TIMEOUT;
private static final String id = "id";
private static boolean useAlias;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf"))
.configure();
String collection;
useAlias = random().nextBoolean();
if (useAlias) {
collection = COLLECTIONORALIAS + "_collection";
} else {
collection = COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
}
}
@Before
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
@Test
public void testSelectWithEvaluatorsStream() throws Exception {
new UpdateRequest()
.add(id, "1", "a_s", "foo", "b_i", "1", "c_d", "3.3", "d_b", "true")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String clause;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("add", AddEvaluator.class)
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
;
try {
// Basic test
clause = "select("
+ "id,"
+ "add(b_i,c_d) as result,"
+ "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
+ ")";
stream = factory.constructStream(clause);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertFields(tuples, "id", "result");
assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
assertEquals(1, tuples.size());
assertDouble(tuples.get(0), "result", 4.3);
assertEquals(4.3, tuples.get(0).get("result"));
} finally {
solrClientCache.close();
}
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();
for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
tuples.add(t);
}
tupleStream.close();
return tuples;
}
protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
return assertOrderOf(tuples, "id", ids);
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
String tip = t.getString(fieldName);
if(!tip.equals(Integer.toString(val))) {
throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertMapOrder(List<Tuple> tuples, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
@SuppressWarnings({"rawtypes"})
List<Map> tip = t.getMaps("group");
int id = (int)tip.get(0).get("id");
if(id != val) {
throw new Exception("Found value:"+id+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(!tuple.getFields().containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
}
}
}
return true;
}
protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(tuple.getFields().containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
}
}
}
return true;
}
protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
List<?> group = (List<?>)tuple.get("tuples");
int i=0;
for(int val : ids) {
Map<?,?> t = (Map<?,?>)group.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
long lv = (long)tuple.get(fieldName);
if(lv != l) {
throw new Exception("Longs not equal:"+l+" : "+lv);
}
return true;
}
public boolean assertDouble(Tuple tuple, String fieldName, double expectedValue) throws Exception {
double value = (double)tuple.get(fieldName);
if(expectedValue != value) {
throw new Exception("Doubles not equal:"+value+" : "+expectedValue);
}
return true;
}
public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception {
String actual = (String)tuple.get(fieldName);
if( (null == expected && null != actual) ||
(null != expected && null == actual) ||
(null != expected && !expected.equals(actual))){
throw new Exception("Longs not equal:"+expected+" : "+actual);
}
return true;
}
protected boolean assertMaps(@SuppressWarnings({"rawtypes"})List<Map> maps, int... ids) throws Exception {
if(maps.size() != ids.length) {
throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
}
int i=0;
for(int val : ids) {
@SuppressWarnings({"rawtypes"})
Map t = maps.get(i);
String tip = (String)t.get("id");
if(!tip.equals(Integer.toString(val))) {
throw new Exception("Found value:"+tip+" expecting:"+val);
}
++i;
}
return true;
}
private boolean assertList(@SuppressWarnings({"rawtypes"})List list, Object... vals) throws Exception {
if(list.size() != vals.length) {
throw new Exception("Lists are not the same size:"+list.size() +" : "+vals.length);
}
for(int i=0; i<list.size(); i++) {
Object a = list.get(i);
Object b = vals[i];
if(!a.equals(b)) {
throw new Exception("List items not equals:"+a+" : "+b);
}
}
return true;
}
}