blob: bec388bf2cc9ab01175929e95ba554400e6540f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.analytics.facet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.solr.analytics.AnalyticsDriver;
import org.apache.solr.analytics.function.ExpressionCalculator;
import org.apache.solr.analytics.function.ReductionCollectionManager;
import org.apache.solr.analytics.function.ReductionCollectionManager.ReductionDataCollection;
import org.apache.solr.analytics.function.reduction.data.ReductionData;
import org.apache.solr.analytics.util.AnalyticsResponseHeadings;
import org.apache.solr.analytics.value.StringValueStream;
/**
* Representation of one layer of a Pivot Facet. A PivotFacet node is individually sortable,
* and is collected during the streaming phase of the {@link AnalyticsDriver}.
*/
public abstract class PivotNode<T> extends SortableFacet implements Consumer<String> {
private StringValueStream expression;
protected Map<String,T> currentPivot;
public PivotNode(String name, StringValueStream expression) {
super(name);
this.expression = expression;
}
/**
* Determine which facet values match the current document. Add the {@link ReductionDataCollection}s of the relevant facet values
* to the targets of the streaming {@link ReductionCollectionManager} so that they are updated with the current document's data.
*/
public void addFacetValueCollectionTargets(Map<String,T> pivot) {
currentPivot = pivot;
expression.streamStrings(this);
}
/**
* Import the shard data from a bit-stream for the given pivot, exported by the {@link #exportPivot} method
* in the each of the collection's shards.
*
* @param input The bit-stream to import the data from
* @param pivot the values for this pivot node and the pivot children (if they exist)
* @throws IOException if an exception occurs while reading from the {@link DataInput}
*/
public void importPivot(DataInput input, Map<String,T> pivot) throws IOException {
int size = input.readInt();
currentPivot = pivot;
for (int i = 0; i < size; ++i) {
importPivotValue(input, input.readUTF());
}
}
/**
* Import the next pivot value's set of {@link ReductionData} and children's {@link ReductionData} if they exist.
*
* @param input the bit-stream to import the reduction data from
* @param pivotValue the next pivot value
* @throws IOException if an exception occurs while reading from the input
*/
protected abstract void importPivotValue(DataInput input, String pivotValue) throws IOException;
/**
* Export the shard data through a bit-stream for the given pivot,
* to be imported by the {@link #importPivot} method in the originating shard.
*
* @param output The bit-stream to output the data through
* @param pivot the values for this pivot node and the pivot children (if they exist)
* @throws IOException if an exception occurs while writing to the {@link DataOutput}
*/
public void exportPivot(DataOutput output, Map<String,T> pivot) throws IOException {
output.writeInt(pivot.size());
for (Map.Entry<String, T> entry : pivot.entrySet()) {
output.writeUTF(entry.getKey());
exportPivotValue(output, entry.getValue());
}
}
/**
* Export the given pivot data, containing {@link ReductionData} and pivot children if they exist.
*
* @param output the bit-stream to output the reduction data to
* @param pivotData the next pivot value data
* @throws IOException if an exception occurs while reading from the input
*/
protected abstract void exportPivotValue(DataOutput output, T pivotData) throws IOException;
/**
* Create the response of the facet to be returned in the overall analytics response.
*
* @param pivot the pivot to create a response for
* @return the response of the facet
*/
public abstract Iterable<Map<String,Object>> getPivotedResponse(Map<String,T> pivot);
/**
* A pivot node that has no pivot children.
*/
public static class PivotLeaf extends PivotNode<ReductionDataCollection> {
public PivotLeaf(String name, StringValueStream expression) {
super(name, expression);
}
@Override
public void accept(String pivotValue) {
ReductionDataCollection collection = currentPivot.get(pivotValue);
if (collection == null) {
collection = collectionManager.newDataCollectionTarget();
currentPivot.put(pivotValue, collection);
} else {
collectionManager.addCollectTarget(collection);
}
}
@Override
protected void importPivotValue(DataInput input, String pivotValue) throws IOException {
ReductionDataCollection dataCollection = currentPivot.get(pivotValue);
if (dataCollection == null) {
currentPivot.put(pivotValue, collectionManager.newDataCollectionIO());
} else {
collectionManager.prepareReductionDataIO(dataCollection);
}
collectionManager.mergeData();
}
@Override
protected void exportPivotValue(DataOutput output, ReductionDataCollection pivotData) throws IOException {
collectionManager.prepareReductionDataIO(pivotData);
collectionManager.exportData();
}
@Override
public Iterable<Map<String,Object>> getPivotedResponse(Map<String,ReductionDataCollection> pivot) {
final List<FacetBucket> facetResults = new ArrayList<>();
pivot.forEach((facetVal, dataCol) -> {
collectionManager.setData(dataCol);
facetResults.add(new FacetBucket(facetVal,expressionCalculator.getResults()));
});
Iterable<FacetBucket> facetResultsIter = applyOptions(facetResults);
final LinkedList<Map<String,Object>> results = new LinkedList<>();
// Export each expression in the bucket.
for (FacetBucket bucket : facetResultsIter) {
Map<String, Object> bucketMap = new HashMap<>();
bucketMap.put(AnalyticsResponseHeadings.PIVOT_NAME, name);
bucketMap.put(AnalyticsResponseHeadings.FACET_VALUE, bucket.getFacetValue());
bucketMap.put(AnalyticsResponseHeadings.RESULTS, bucket.getResults());
results.add(bucketMap);
}
return results;
}
}
/**
* A pivot node that has pivot children.
*/
public static class PivotBranch<T> extends PivotNode<PivotBranch.PivotDataPair<T>> {
private final PivotNode<T> childPivot;
public PivotBranch(String name, StringValueStream expression, PivotNode<T> childPivot) {
super(name, expression);
this.childPivot = childPivot;
}
@Override
public void setReductionCollectionManager(ReductionCollectionManager collectionManager) {
super.setReductionCollectionManager(collectionManager);
childPivot.setReductionCollectionManager(collectionManager);
}
@Override
public void setExpressionCalculator(ExpressionCalculator expressionCalculator) {
super.setExpressionCalculator(expressionCalculator);
childPivot.setExpressionCalculator(expressionCalculator);
}
@Override
public void accept(String pivotValue) {
PivotDataPair<T> pivotData = currentPivot.get(pivotValue);
if (pivotData == null) {
pivotData = new PivotDataPair<>();
pivotData.childPivots = new HashMap<>();
pivotData.pivotReduction = collectionManager.newDataCollectionTarget();
currentPivot.put(pivotValue, pivotData);
} else {
collectionManager.addCollectTarget(pivotData.pivotReduction);
}
childPivot.addFacetValueCollectionTargets(pivotData.childPivots);
}
@Override
protected void importPivotValue(DataInput input, String pivotValue) throws IOException {
PivotDataPair<T> pivotData = currentPivot.get(pivotValue);
if (pivotData == null) {
pivotData = new PivotDataPair<>();
pivotData.childPivots = new HashMap<>();
pivotData.pivotReduction = collectionManager.newDataCollectionIO();
currentPivot.put(pivotValue, pivotData);
} else {
collectionManager.prepareReductionDataIO(pivotData.pivotReduction);
}
collectionManager.mergeData();
childPivot.importPivot(input, pivotData.childPivots);
}
@Override
protected void exportPivotValue(DataOutput output, PivotDataPair<T> pivotData) throws IOException {
collectionManager.prepareReductionDataIO(pivotData.pivotReduction);
collectionManager.exportData();
childPivot.exportPivot(output, pivotData.childPivots);
}
@Override
public Iterable<Map<String,Object>> getPivotedResponse(Map<String,PivotDataPair<T>> pivot) {
final List<FacetBucket> facetResults = new ArrayList<>();
pivot.forEach((facetVal, dataPair) -> {
collectionManager.setData(dataPair.pivotReduction);
facetResults.add(new FacetBucket(facetVal,expressionCalculator.getResults()));
});
Iterable<FacetBucket> facetResultsIter = applyOptions(facetResults);
final LinkedList<Map<String,Object>> results = new LinkedList<>();
// Export each expression in the bucket.
for (FacetBucket bucket : facetResultsIter) {
Map<String, Object> bucketMap = new HashMap<>();
bucketMap.put(AnalyticsResponseHeadings.PIVOT_NAME, name);
bucketMap.put(AnalyticsResponseHeadings.FACET_VALUE, bucket.getFacetValue());
bucketMap.put(AnalyticsResponseHeadings.RESULTS, bucket.getResults());
bucketMap.put(AnalyticsResponseHeadings.PIVOT_CHILDREN, childPivot.getPivotedResponse(pivot.get(bucket.getFacetValue()).childPivots));
results.add(bucketMap);
}
return results;
}
/**
* Contains pivot data for {@link PivotNode.PivotBranch} classes.
*/
protected static class PivotDataPair<T> {
ReductionDataCollection pivotReduction;
Map<String,T> childPivots;
}
}
}