blob: 187500ecdc6e3fff9584ec0bf6d679a021b358fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.io.stream.metrics.WeightedSumMetric;
/**
* Indicates the underlying stream source supports parallelizing metrics computation across collections
* using a rollup of metrics from each collection.
*/
public interface ParallelMetricsRollup {
/**
* Given a list of collections, return an array of TupleStream for each partition.
*
* @param partitions A list of collections to parallelize metrics computation across.
* @return An array of TupleStream for each partition requested.
* @throws IOException if an error occurs while constructing the underlying TupleStream for a partition.
*/
TupleStream[] parallelize(List<String> partitions) throws IOException;
/**
* Get the rollup for the parallelized streams that is sorted based on the original (non-parallel) sort order.
*
* @param plistStream A parallel list stream to fetch metrics from each partition concurrently
* @param rollupMetrics An array of metrics to rollup
* @return A rollup over parallelized streams that provide metrics; this is typically a SortStream.
* @throws IOException if an error occurs while reading from the sorted stream
*/
TupleStream getSortedRollupStream(ParallelListStream plistStream, Metric[] rollupMetrics) throws IOException;
/**
* Given a list of partitions (collections), open a select stream that projects the dimensions and
* metrics produced by rolling up over a parallelized group of streams. If it's not possible to rollup
* the metrics produced by the underlying metrics stream, this method returns Optional.empty.
*
* @param context The current streaming expression context
* @param partitions A list of collections to parallelize metrics computation across.
* @param metrics A list of metrics to rollup.
* @return Either a TupleStream that performs a rollup over parallelized streams or empty if parallelization is not possible.
* @throws IOException if an error occurs reading tuples from the parallelized streams
*/
default Optional<TupleStream> openParallelStream(StreamContext context, List<String> partitions, Metric[] metrics) throws IOException {
Optional<Metric[]> maybeRollupMetrics = getRollupMetrics(metrics);
if (!maybeRollupMetrics.isPresent())
return Optional.empty(); // some metric is incompatible with doing a rollup over the plist results
TupleStream parallelStream = getSortedRollupStream(new ParallelListStream(parallelize(partitions)), maybeRollupMetrics.get());
parallelStream.setStreamContext(context);
parallelStream.open();
return Optional.of(parallelStream);
}
/**
* Either an array of metrics that can be parallelized and rolled up or empty.
*
* @param metrics The list of metrics that we want to parallelize.
* @return Either an array of metrics that can be parallelized and rolled up or empty.
*/
default Optional<Metric[]> getRollupMetrics(Metric[] metrics) {
Metric[] rollup = new Metric[metrics.length];
CountMetric count = null;
for (int m = 0; m < rollup.length; m++) {
Metric nextRollup;
Metric next = metrics[m];
if (next instanceof SumMetric) {
// sum of sums
nextRollup = new SumMetric(next.getIdentifier());
nextRollup.outputLong = next.outputLong;
} else if (next instanceof MinMetric) {
// min of mins
nextRollup = new MinMetric(next.getIdentifier());
nextRollup.outputLong = next.outputLong;
} else if (next instanceof MaxMetric) {
// max of max
nextRollup = new MaxMetric(next.getIdentifier());
nextRollup.outputLong = next.outputLong;
} else if (next instanceof CountMetric) {
// sum of counts
nextRollup = new SumMetric(next.getIdentifier());
nextRollup.outputLong = next.outputLong;
count = (CountMetric) next;
} else if (next instanceof MeanMetric) {
// WeightedSumMetric must have a count to compute the weighted avg. rollup from ...
// if the user is not requesting count, then we can't parallelize
if (count == null) {
// just look past the current position
for (int n = m + 1; n < metrics.length; n++) {
if (metrics[n] instanceof CountMetric) {
count = (CountMetric) metrics[n];
break;
}
}
}
if (count != null) {
nextRollup = new WeightedSumMetric(next.getIdentifier(), count.getIdentifier());
} else {
return Optional.empty(); // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric)
}
} else if (next instanceof CountDistinctMetric) {
// rollup of count distinct is the max across the tiers
nextRollup = new MaxMetric(next.getIdentifier());
nextRollup.outputLong = next.outputLong;
} else {
return Optional.empty(); // can't parallelize this expr!
}
rollup[m] = nextRollup;
}
return Optional.of(rollup);
}
}