blob: ec7f4db60fb42cae8266995766d378785cd649e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.sql;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTableQueryable;
import org.apache.calcite.util.Pair;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.eval.AndEvaluator;
import org.apache.solr.client.solrj.io.eval.EqualToEvaluator;
import org.apache.solr.client.solrj.io.eval.GreaterThanEqualToEvaluator;
import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
import org.apache.solr.client.solrj.io.eval.LessThanEqualToEvaluator;
import org.apache.solr.client.solrj.io.eval.LessThanEvaluator;
import org.apache.solr.client.solrj.io.eval.NotEvaluator;
import org.apache.solr.client.solrj.io.eval.OrEvaluator;
import org.apache.solr.client.solrj.io.eval.RawValueEvaluator;
import org.apache.solr.client.solrj.io.eval.RecursiveBooleanEvaluator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.HavingStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.SortStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.APPROX_COUNT_DISTINCT;
import static org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric.COUNT_DISTINCT;
import static org.apache.solr.common.params.CommonParams.SORT;
/**
* Table based on a Solr collection
*/
class SolrTable extends AbstractQueryableTable implements TranslatableTable {
private static final String DEFAULT_QUERY = "*:*";
private final String collection;
private final SolrSchema schema;
private RelProtoDataType protoRowType;
SolrTable(SolrSchema schema, String collection) {
super(Object[].class);
this.schema = schema;
this.collection = collection;
}
private static StreamComparator bucketSortComp(List<Bucket> buckets, Map<String, String> dirs) {
FieldComparator[] comps = new FieldComparator[buckets.size()];
for (int i = 0; i < buckets.size(); i++) {
ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dirs.get(buckets.get(i).toString()));
String sortKey = buckets.get(i).toString();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}
if (comps.length == 1) {
return comps[0];
} else {
return new MultipleFieldComparator(comps);
}
}
private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
FieldComparator[] comps = new FieldComparator[buckets.length];
for (int i = 0; i < buckets.length; i++) {
ComparatorOrder comparatorOrder = ascDescComp(dir);
String sortKey = buckets[i].toString();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}
if (comps.length == 1) {
return comps[0];
} else {
return new MultipleFieldComparator(comps);
}
}
private static String getSortDirection(List<Pair<String, String>> orders) {
if (orders != null && orders.size() > 0) {
for (Pair<String, String> item : orders) {
return item.getValue();
}
}
return "asc";
}
private static String bucketSort(Bucket[] buckets, String dir) {
StringBuilder buf = new StringBuilder();
boolean comma = false;
for (Bucket bucket : buckets) {
if (comma) {
buf.append(",");
}
buf.append(bucket.toString()).append(" ").append(dir);
comma = true;
}
return buf.toString();
}
private static String getPartitionKeys(Bucket[] buckets) {
StringBuilder buf = new StringBuilder();
boolean comma = false;
for (Bucket bucket : buckets) {
if (comma) {
buf.append(",");
}
buf.append(bucket.toString());
comma = true;
}
return buf.toString();
}
private static boolean sortsEqual(Bucket[] buckets, String direction, List<Pair<String, String>> orders) {
if (buckets.length != orders.size()) {
return false;
}
for (int i = 0; i < buckets.length; i++) {
Bucket bucket = buckets[i];
Pair<String, String> order = orders.get(i);
if (!bucket.toString().equals(order.getKey())) {
return false;
}
if (!order.getValue().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
return false;
}
}
return true;
}
private static FieldComparator[] getComps(List<Pair<String, String>> orders) {
FieldComparator[] comps = new FieldComparator[orders.size()];
for (int i = 0; i < orders.size(); i++) {
Pair<String, String> sortItem = orders.get(i);
String ordering = sortItem.getValue();
ComparatorOrder comparatorOrder = ascDescComp(ordering);
String sortKey = sortItem.getKey();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}
return comps;
}
private static ComparatorOrder ascDescComp(String s) {
if (s.toLowerCase(Locale.ROOT).contains("desc")) {
return ComparatorOrder.DESCENDING;
} else {
return ComparatorOrder.ASCENDING;
}
}
public String toString() {
return "SolrTable {" + collection + "}";
}
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType == null) {
protoRowType = schema.getRelDataType(collection);
}
return protoRowType.apply(typeFactory);
}
private Enumerable<Object> query(final Properties properties) {
return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), null, null, null, null);
}
/**
* Executes a Solr query on the underlying table.
*
* @param properties Connections properties
* @param fields List of fields to project
* @param query A string for the query
* @return Enumerator of results
*/
private Enumerable<Object> query(final Properties properties,
@SuppressWarnings("rawtypes") final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final List<String> buckets,
final List<Pair<String, String>> metricPairs,
final String limit,
final String negativeQuery,
final String havingPredicate,
final String offset) {
// SolrParams should be a ModifiableParams instead of a map
boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode"));
boolean negative = Boolean.parseBoolean(negativeQuery);
String q = null;
if (query == null) {
q = DEFAULT_QUERY;
} else {
if (negative) {
q = DEFAULT_QUERY + " AND " + query;
} else {
q = query;
}
}
TupleStream tupleStream;
String zk = properties.getProperty("zk");
try {
if (metricPairs.isEmpty() && buckets.isEmpty()) {
tupleStream = handleSelect(zk, collection, q, fields, orders, limit, offset);
} else {
if (buckets.isEmpty()) {
tupleStream = handleStats(zk, collection, q, metricPairs, fields);
} else {
if (mapReduce) {
tupleStream = handleGroupByMapReduce(zk,
collection,
properties,
fields,
q,
orders,
buckets,
metricPairs,
limit,
havingPredicate);
} else {
tupleStream = handleGroupByFacet(zk,
collection,
fields,
q,
orders,
buckets,
metricPairs,
limit,
havingPredicate);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(schema.getSolrClientCache());
tupleStream.setStreamContext(streamContext);
final TupleStream finalStream = tupleStream;
return new AbstractEnumerable<Object>() {
// Use original fields list to make sure only the fields specified are enumerated
public Enumerator<Object> enumerator() {
return new SolrEnumerator(finalStream, fields);
}
};
}
private String getSortDirection(Map.Entry<String, String> order) {
String direction = order.getValue();
return direction == null ? "asc" : direction;
}
private StreamComparator getComp(List<? extends Map.Entry<String, String>> orders) {
FieldComparator[] comps = new FieldComparator[orders.size()];
for (int i = 0; i < orders.size(); i++) {
Map.Entry<String, String> order = orders.get(i);
String direction = getSortDirection(order);
ComparatorOrder comparatorOrder = ComparatorOrder.fromString(direction);
String sortKey = order.getKey();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}
if (comps.length == 1) {
return comps[0];
} else {
return new MultipleFieldComparator(comps);
}
}
private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs, boolean ifEmptyCount) {
List<Metric> metrics = new ArrayList<>(metricPairs.size());
metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
if (metrics.size() == 0 && ifEmptyCount) {
metrics.add(new CountMetric());
}
return metrics;
}
private Metric getMetric(Pair<String, String> metricPair) {
switch (metricPair.getKey()) {
case COUNT_DISTINCT:
return new CountDistinctMetric(metricPair.getValue());
case APPROX_COUNT_DISTINCT:
return new CountDistinctMetric(metricPair.getValue(), true);
case "COUNT":
return new CountMetric(metricPair.getValue());
case "SUM":
case "$SUM0":
return new SumMetric(metricPair.getValue());
case "MIN":
return new MinMetric(metricPair.getValue());
case "MAX":
return new MaxMetric(metricPair.getValue());
case "AVG":
return new MeanMetric(metricPair.getValue());
default:
throw new IllegalArgumentException(metricPair.getKey());
}
}
@SuppressWarnings({"rawtypes"})
private TupleStream handleSelect(String zk,
String collection,
String query,
List<Map.Entry<String, Class>> fields,
List<Pair<String, String>> orders,
String limit,
String offset) throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, query);
//Validate the fields
for (Map.Entry<String, Class> entry : fields) {
String fname = entry.getKey();
if (limit == null && "score".equals(fname)) {
throw new IOException("score is not a valid field for unlimited queries.");
}
if (fname.contains("*")) {
throw new IOException("* is not supported for column selection.");
}
}
String fl = getFields(fields);
if (!orders.isEmpty()) {
params.add(SORT, getSort(orders));
} else {
if (limit == null) {
params.add(SORT, "_version_ desc");
fl = fl + ",_version_";
} else {
params.add(SORT, "score desc");
if (!fl.contains("score")) {
fl = fl + ",score";
}
}
}
params.add(CommonParams.FL, fl);
if (offset != null && limit == null) {
throw new IOException("OFFSET without LIMIT not supported by Solr! Specify desired limit using 'FETCH NEXT <LIMIT> ROWS ONLY'");
}
if (limit != null) {
int limitInt = Integer.parseInt(limit);
// if there's an offset, then we need to fetch offset + limit rows from each shard and then sort accordingly
LimitStream limitStream;
if (offset != null) {
int offsetInt = Integer.parseInt(offset);
int rows = limitInt + offsetInt;
params.add(CommonParams.START, "0"); // tricky ... we need all rows up to limit + offset
params.add(CommonParams.ROWS, String.valueOf(rows));
// re-sort all the streams back from the shards
StreamComparator streamSorter = new MultipleFieldComparator(getComps(orders));
limitStream = new LimitStream(new SortStream(new CloudSolrStream(zk, collection, params), streamSorter), limitInt, offsetInt);
} else {
params.add(CommonParams.ROWS, limit);
limitStream = new LimitStream(new CloudSolrStream(zk, collection, params), limitInt);
}
return limitStream;
} else {
params.add(CommonParams.QT, "/export");
return new CloudSolrStream(zk, collection, params);
}
}
private String getSort(List<Pair<String, String>> orders) {
StringBuilder buf = new StringBuilder();
for (Pair<String, String> pair : orders) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(pair.getKey()).append(" ").append(pair.getValue());
}
return buf.toString();
}
private String getSingleSort(Pair<String, String> order) {
StringBuilder buf = new StringBuilder();
buf.append(order.getKey()).append(" ").append(order.getValue());
return buf.toString();
}
@SuppressWarnings({"rawtypes"})
private String getFields(List<Map.Entry<String, Class>> fields) {
StringBuilder buf = new StringBuilder();
for (Map.Entry<String, Class> field : fields) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(field.getKey());
}
return buf.toString();
}
private String getFields(Set<String> fieldSet) {
StringBuilder buf = new StringBuilder();
for (String field : fieldSet) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(field);
}
return buf.toString();
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Set<String> getFieldSet(Metric[] metrics, List<Map.Entry<String, Class>> fields) {
HashSet set = new HashSet<>();
for (Metric metric : metrics) {
for (String column : metric.getColumns()) {
set.add(column);
}
}
for (Map.Entry<String, Class> field : fields) {
if (field.getKey().indexOf('(') == -1) {
set.add(field.getKey());
}
}
return set;
}
@SuppressWarnings({"rawtypes"})
private TupleStream handleGroupByMapReduce(String zk,
String collection,
Properties properties,
final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final List<String> _buckets,
final List<Pair<String, String>> metricPairs,
final String limit,
final String havingPredicate) throws IOException {
Map<String, Class> fmap = new HashMap<>();
for (Map.Entry<String, Class> entry : fields) {
fmap.put(entry.getKey(), entry.getValue());
}
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
Bucket[] buckets = buildBuckets(_buckets, fields);
Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
if (metrics.length == 0) {
return handleSelectDistinctMapReduce(zk, collection, properties, fields, query, orders, buckets, limit);
} else {
for (Metric metric : metrics) {
Class c = fmap.get(metric.getIdentifier());
if (Long.class.equals(c)) {
metric.outputLong = true;
}
}
}
Set<String> fieldSet = getFieldSet(metrics, fields);
if (metrics.length == 0) {
throw new IOException("Group by queries must include atleast one aggregate function.");
}
String fl = getFields(fieldSet);
String sortDirection = getSortDirection(orders);
String sort = bucketSort(buckets, sortDirection);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.FL, fl);
params.set(CommonParams.Q, query);
params.set(CommonParams.WT, CommonParams.JAVABIN);
//Always use the /export handler for Group By Queries because it requires exporting full result sets.
params.set(CommonParams.QT, "/export");
if (numWorkers > 1) {
params.set("partitionKeys", getPartitionKeys(buckets));
}
params.set(SORT, sort);
TupleStream tupleStream = null;
CloudSolrStream cstream = new CloudSolrStream(zk, collection, params);
tupleStream = new RollupStream(cstream, buckets, metrics);
StreamFactory factory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("and", AndEvaluator.class)
.withFunctionName("or", OrEvaluator.class)
.withFunctionName("not", NotEvaluator.class)
.withFunctionName("eq", EqualToEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
.withFunctionName("lt", LessThanEvaluator.class)
.withFunctionName("val", RawValueEvaluator.class)
.withFunctionName("lteq", LessThanEqualToEvaluator.class)
.withFunctionName("having", HavingStream.class)
.withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
if (havingPredicate != null) {
RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator) factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
tupleStream = new HavingStream(tupleStream, booleanOperation);
}
if (numWorkers > 1) {
// Do the rollups in parallel
// Maintain the sort of the Tuples coming from the workers.
StreamComparator comp = bucketSortComp(buckets, sortDirection);
@SuppressWarnings("resource") final ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
parallelStream.setStreamFactory(factory);
tupleStream = parallelStream;
}
//TODO: Currently we are not pushing down the having clause.
// We need to push down the having clause to ensure that LIMIT does not cut off records prior to the having filter.
if (orders != null && orders.size() > 0) {
if (!sortsEqual(buckets, sortDirection, orders)) {
int lim = (limit == null) ? 100 : Integer.parseInt(limit);
StreamComparator comp = getComp(orders);
//Rank the Tuples
//If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
//Providing a true Top or Bottom.
tupleStream = new RankStream(tupleStream, lim, comp);
} else {
// Sort is the same as the same as the underlying stream
// Only need to limit the result, not Rank the result
if (limit != null) {
tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
}
}
} else {
//No order by, check for limit
if (limit != null) {
tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
}
}
return tupleStream;
}
@SuppressWarnings({"rawtypes"})
private Bucket[] buildBuckets(List<String> buckets, List<Map.Entry<String, Class>> fields) {
Bucket[] bucketsArray = new Bucket[buckets.size()];
int i = 0;
for (Map.Entry<String, Class> field : fields) {
String fieldName = field.getKey();
if (buckets.contains(fieldName)) {
bucketsArray[i++] = new Bucket(fieldName);
}
}
return bucketsArray;
}
@SuppressWarnings({"rawtypes"})
private TupleStream handleGroupByFacet(String zkHost,
String collection,
final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final List<String> bucketFields,
final List<Pair<String, String>> metricPairs,
final String lim,
final String havingPredicate) throws IOException {
Map<String, Class> fmap = new HashMap<>();
for (Map.Entry<String, Class> f : fields) {
fmap.put(f.getKey(), f.getValue());
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add(CommonParams.Q, query);
Bucket[] buckets = buildBuckets(bucketFields, fields);
Metric[] metrics = buildMetrics(metricPairs, true).toArray(new Metric[0]);
if (metrics.length == 0) {
metrics = new Metric[1];
metrics[0] = new CountMetric();
} else {
for (Metric metric : metrics) {
Class c = fmap.get(metric.getIdentifier());
if (Long.class.equals(c)) {
metric.outputLong = true;
}
}
}
int limit = lim != null ? Integer.parseInt(lim) : 1000;
FieldComparator[] sorts = null;
if (orders == null || orders.size() == 0) {
sorts = new FieldComparator[buckets.length];
for (int i = 0; i < sorts.length; i++) {
sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
}
} else {
sorts = getComps(orders);
}
int overfetch = (int) (limit * 1.25);
TupleStream tupleStream = new FacetStream(zkHost,
collection,
solrParams,
buckets,
metrics,
sorts,
overfetch);
StreamFactory factory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("and", AndEvaluator.class)
.withFunctionName("or", OrEvaluator.class)
.withFunctionName("not", NotEvaluator.class)
.withFunctionName("eq", EqualToEvaluator.class)
.withFunctionName("val", RawValueEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
.withFunctionName("lt", LessThanEvaluator.class)
.withFunctionName("lteq", LessThanEqualToEvaluator.class)
.withFunctionName("gteq", GreaterThanEqualToEvaluator.class);
if (havingPredicate != null) {
RecursiveBooleanEvaluator booleanOperation = (RecursiveBooleanEvaluator) factory.constructEvaluator(StreamExpressionParser.parse(havingPredicate));
tupleStream = new HavingStream(tupleStream, booleanOperation);
}
if (lim != null) {
tupleStream = new LimitStream(tupleStream, limit);
}
return tupleStream;
}
@SuppressWarnings({"rawtypes"})
private TupleStream handleSelectDistinctMapReduce(final String zkHost,
final String collection,
final Properties properties,
final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final Bucket[] buckets,
final String limit) throws IOException {
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
String fl = getFields(fields);
String sort = null;
StreamEqualitor ecomp = null;
StreamComparator comp = null;
if (orders != null && orders.size() > 0) {
StreamComparator[] adjustedSorts = adjustSorts(orders, buckets);
// Because of the way adjustSorts works we know that each FieldComparator has a single
// field name. For this reason we can just look at the leftFieldName
FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
StringBuilder buf = new StringBuilder();
for (int i = 0; i < adjustedSorts.length; i++) {
FieldComparator fieldComparator = (FieldComparator) adjustedSorts[i];
fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
if (i > 0) {
buf.append(",");
}
buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
}
sort = buf.toString();
if (adjustedSorts.length == 1) {
ecomp = fieldEqualitors[0];
comp = adjustedSorts[0];
} else {
ecomp = new MultipleFieldEqualitor(fieldEqualitors);
comp = new MultipleFieldComparator(adjustedSorts);
}
} else {
StringBuilder sortBuf = new StringBuilder();
FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
StreamComparator[] streamComparators = new StreamComparator[buckets.length];
for (int i = 0; i < buckets.length; i++) {
equalitors[i] = new FieldEqualitor(buckets[i].toString());
streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
if (i > 0) {
sortBuf.append(',');
}
sortBuf.append(buckets[i].toString()).append(" asc");
}
sort = sortBuf.toString();
if (equalitors.length == 1) {
ecomp = equalitors[0];
comp = streamComparators[0];
} else {
ecomp = new MultipleFieldEqualitor(equalitors);
comp = new MultipleFieldComparator(streamComparators);
}
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.FL, fl);
params.set(CommonParams.Q, query);
params.set(CommonParams.WT, CommonParams.JAVABIN);
//Always use the /export handler for Distinct Queries because it requires exporting full result sets.
params.set(CommonParams.QT, "/export");
if (numWorkers > 1) {
params.set("partitionKeys", getPartitionKeys(buckets));
}
params.set(SORT, sort);
TupleStream tupleStream = null;
CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
tupleStream = new UniqueStream(cstream, ecomp);
if (numWorkers > 1) {
// Do the unique in parallel
// Maintain the sort of the Tuples coming from the workers.
@SuppressWarnings("resource") final ParallelStream parallelStream = new ParallelStream(zkHost, collection, tupleStream, numWorkers, comp);
StreamFactory factory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("unique", UniqueStream.class);
parallelStream.setStreamFactory(factory);
tupleStream = parallelStream;
}
if (limit != null) {
tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
}
return tupleStream;
}
private StreamComparator[] adjustSorts(List<Pair<String, String>> orders, Bucket[] buckets) throws IOException {
List<FieldComparator> adjustedSorts = new ArrayList<>();
Set<String> bucketFields = new HashSet<>();
Set<String> sortFields = new HashSet<>();
ComparatorOrder comparatorOrder = ComparatorOrder.ASCENDING;
for (Pair<String, String> order : orders) {
sortFields.add(order.getKey());
adjustedSorts.add(new FieldComparator(order.getKey(), ascDescComp(order.getValue())));
comparatorOrder = ascDescComp(order.getValue());
}
for (Bucket bucket : buckets) {
bucketFields.add(bucket.toString());
}
for (String sf : sortFields) {
if (!bucketFields.contains(sf)) {
throw new IOException("All sort fields must be in the field list.");
}
}
//Add sort fields if needed
if (sortFields.size() < buckets.length) {
for (Bucket bucket : buckets) {
String b = bucket.toString();
if (!sortFields.contains(b)) {
adjustedSorts.add(new FieldComparator(bucket.toString(), comparatorOrder));
}
}
}
return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
}
@SuppressWarnings({"rawtypes"})
private TupleStream handleStats(String zk,
String collection,
String query,
List<Pair<String, String>> metricPairs,
List<Map.Entry<String, Class>> fields) throws IOException {
Map<String, Class> fmap = new HashMap<>();
for (Map.Entry<String, Class> entry : fields) {
fmap.put(entry.getKey(), entry.getValue());
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add(CommonParams.Q, query);
Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
for (Metric metric : metrics) {
Class c = fmap.get(metric.getIdentifier());
if (Long.class.equals(c)) {
metric.outputLong = true;
}
}
return new StatsStream(zk, collection, solrParams, metrics);
}
public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {
return new SolrQueryable<>(queryProvider, schema, this, tableName);
}
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
return new SolrTableScan(cluster, cluster.traitSetOf(SolrRel.CONVENTION), relOptTable, this, null);
}
@SuppressWarnings("WeakerAccess")
public static class SolrQueryable<T> extends AbstractTableQueryable<T> {
SolrQueryable(QueryProvider queryProvider, SchemaPlus schema, SolrTable table, String tableName) {
super(queryProvider, schema, table, tableName);
}
public Enumerator<T> enumerator() {
@SuppressWarnings("unchecked") final Enumerable<T> enumerable = (Enumerable<T>) getTable().query(getProperties());
return enumerable.enumerator();
}
private SolrTable getTable() {
return (SolrTable) table;
}
private Properties getProperties() {
return schema.unwrap(SolrSchema.class).properties;
}
/**
* Called via code-generation.
*
* @see SolrMethod#SOLR_QUERYABLE_QUERY
*/
@SuppressWarnings({"rawtypes", "UnusedDeclaration"})
public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, String query, List<Pair<String, String>> order,
List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery, String havingPredicate, String offset) {
return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset);
}
}
}