blob: e7f5fe1f57ade70209a70fa904884de6717e3a9d [file] [log] [blame]
= Stream Source Reference
:toclevels: 1
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
== search
The `search` function searches a SolrCloud collection and emits a stream of tuples that match the query. This is very similar to a standard Solr query, and uses many of the same parameters.
This expression allows you to specify a request hander using the `qt` parameter. By default, the `/select` handler is used. The `/select` handler can be used for simple rapid prototyping of expressions. For production, however, you will most likely want to use the `/export` handler which is designed to `sort` and `export` entire result sets. The `/export` handler is not used by default because it has stricter requirements then the `/select` handler so it's not as easy to get started working with. To read more about the `/export` handler requirements review the section <<exporting-result-sets.adoc#,Exporting Result Sets>>.
=== search Parameters
* `collection`: (Mandatory) the collection being searched.
* `q`: (Mandatory) The query to perform on the Solr index.
* `fl`: (Mandatory) The list of fields to return.
* `sort`: (Mandatory) The sort criteria.
* `zkHost`: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
* `qt`: Specifies the query type, or request handler, to use. Set this to `/export` to work with large result sets. The default is `/select`.
* `rows`: (Mandatory with the `/select` handler) The rows parameter specifies how many rows to return. This parameter is only needed with the `/select` handler (which is the default) since the `/export` handler always returns all rows.
* `partitionKeys`: Comma delimited list of keys to partition the search results by. To be used with the parallel function for parallelizing operations across worker nodes. See the <<stream-decorator-reference.adoc#parallel,parallel>> function for details.
=== search Syntax
[source,text]
----
expr=search(collection1,
zkHost="localhost:9983",
qt="/export",
q="*:*",
fl="id,a_s,a_i,a_f",
sort="a_f asc, a_i asc")
----
== jdbc
The `jdbc` function searches a JDBC datasource and emits a stream of tuples representing the JDBC result set. Each row in the result set is translated into a tuple and each tuple contains all the cell values for that row.
=== jdbc Parameters
* `connection`: (Mandatory) JDBC formatted connection string to whatever driver you are using.
* `sql`: (Mandatory) query to pass off to the JDBC endpoint
* `sort`: (Mandatory) The sort criteria indicating how the data coming out of the JDBC stream is sorted
* `driver`: The name of the JDBC driver used for the connection. If provided then the driver class will attempt to be loaded into the JVM. If not provided then it is assumed that the driver is already loaded into the JVM. Some drivers require explicit loading so this option is provided.
* `[driverProperty]`: One or more properties to pass to the JDBC driver during connection. The format is `propertyName="propertyValue"`. You can provide as many of these properties as you'd like and they will all be passed to the connection.
=== Connections and Drivers
Because some JDBC drivers require explicit loading the `driver` parameter can be used to provide the driver class name. If provided, then during stream construction the driver will be loaded. If the driver cannot be loaded because the class is not found on the classpath, then stream construction will fail.
When the JDBC stream is opened it will validate that a driver can be found for the provided connection string. If a driver cannot be found (because it hasn't been loaded) then the open will fail.
=== Datatypes
Due to the inherent differences in datatypes across JDBC sources the following datatypes are supported. The table indicates what Java type will be used for a given JDBC type. Types marked as requiring conversion will go through a conversion for each value of that type. For performance reasons the cell data types are only considered when the stream is opened as this is when the converters are created.
[width="100%",options="header",]
|===
|JDBC Type |Java Type |Requires Conversion
|String |String |No
|Short |Long |Yes
|Integer |Long |Yes
|Long |Long |No
|Float |Double |Yes
|Double |Double |No
|Boolean |Boolean |No
|===
=== jdbc Syntax
A basic `jdbc` expression:
[source,text]
----
jdbc(
connection="jdbc:hsqldb:mem:.",
sql="select NAME, ADDRESS, EMAIL, AGE from PEOPLE where AGE > 25 order by AGE, NAME DESC",
sort="AGE asc, NAME desc",
driver="org.hsqldb.jdbcDriver"
)
----
A `jdbc` expression that passes a property to the driver:
[source,text]
----
// get_column_name is a property to pass to the hsqldb driver
jdbc(
connection="jdbc:hsqldb:mem:.",
sql="select NAME as FIRST_NAME, ADDRESS, EMAIL, AGE from PEOPLE where AGE > 25 order by AGE, NAME DESC",
sort="AGE asc, NAME desc",
driver="org.hsqldb.jdbcDriver",
get_column_name="false"
)
----
== drill
The `drill` function is designed to support efficient high cardinality aggregation.
The `drill` function sends a request to the `export` handler in a specific collection which includes a streaming expression that the `export` handler applies to the sorted result set.
The `export` handler then emits the aggregated tuples.
The `drill` function reads and emits the aggregated tuples from each shard maintaining the sort order, but does not merge the aggregations.
Streaming expression functions can be wrapped around the `drill` function to
merge the aggregates.
=== drill Parameters
* `collection`: (Mandatory) the collection being searched.
* `q`: (Mandatory) The query to perform on the Solr index.
* `fl`: (Mandatory) The list of fields to return.
* `sort`: (Mandatory) The sort criteria.
* `expr`: The streaming expression that is sent to the export handler that operates over the sorted
result set. The `input()` function provides the stream of sorted tuples from the export handler (see examples below).
=== drill Syntax
Example 1: Basic drill syntax
[source,text]
----
drill(articles,
q="abstract:water",
fl="author",
sort="author asc",
rollup(input(), over="author", count(*)))
----
Example 2: A `rollup` wrapped around the `drill` function to sum the counts emitted from each shard.
[source,text]
----
rollup(drill(articles,
q="abstract:water",
fl="author",
sort="author asc",
rollup(input(), over="author", count(*))),
over="author",
sum(count(*)))
----
== echo
The `echo` function returns a single tuple echoing its text parameter.
`Echo` is the simplest stream source designed to provide text to a text analyzing stream decorator.
=== echo Syntax
[source,text]
----
echo("Hello world")
----
== facet
The `facet` function provides aggregations that are rolled up over buckets. Under the covers the facet function pushes down the aggregation into the search engine using Solr's JSON Facet API. This provides sub-second performance for many use cases. The facet function is appropriate for use with a low to moderate number of distinct values in the bucket fields. To support high cardinality aggregations see the rollup function.
=== facet Parameters
* `collection`: (Mandatory) Collection the facets will be aggregated from.
* `q`: (Mandatory) The query to build the aggregations from.
* `buckets`: (Mandatory) Comma separated list of fields to rollup over. The comma separated list represents the dimensions in a multi-dimensional rollup.
* `bucketSorts`: (Mandatory) Comma separated list of sorts to apply to each dimension in the buckets parameters. Sorts can be on the computed metrics or on the bucket values.
* `rows`: (Default 10) The number of rows to return. '-1' will return all rows.
* `offset`:(Default 0) The offset in the result set to start from.
* `overfetch`: (Default 150) Over-fetching is used to provide accurate aggregations over high cardinality fields.
* `method`: The JSON facet API aggregation method.
* `bucketSizeLimit`: Sets the absolute number of rows to fetch. This is incompatible with rows, offset and overfetch. This value is applied to each dimension. '-1' will fetch all the buckets.
* `metrics`: List of metrics to compute for the buckets. Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `per(col, 50)`. The `per` metric calculates a percentile
for a numeric column and can be specified multiple times in the same facet function.
* `tiered`: (Default false) Flag governing whether the `facet` stream should parallelize JSON Facet requests to multiple Solr collections using a `plist` expression; this option only applies if the `collection` is an alias backed by multiple collections. If `tiered` is enabled, then a `rollup` expression is used internally to aggregate the metrics from multiple `facet` expressions into a single result; only `count`, `min`, `max`, `sum`, and `avg` metrics are supported. Client applications can enable / disable this globally by setting the `solr.facet.stream.tiered=true|false` system property. Tiered enabled will be the default in Solr 9.x and beyond.
=== facet Syntax
Example 1:
[source,text]
----
facet(collection1,
q="*:*",
buckets="a_s",
bucketSorts="sum(a_i) desc",
rows=100,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
----
The example above shows a facet function with rollups over a single bucket, where the buckets are returned in descending order by the calculated value of the `sum(a_i)` metric.
Example 2:
[source,text]
----
facet(collection1,
q="*:*",
buckets="year_i, month_i, day_i",
bucketSorts="year_i desc, month_i desc, day_i desc",
rows=10,
offset=20,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
----
The example above shows a `facet` function with rollups over three buckets, where the buckets are returned in descending order by bucket value.
The `rows` parameter returns 10 rows and the `offset` parameter starts returning rows from the 20th row.
== features
The `features` function extracts the key terms from a text field in a classification training set stored in a SolrCloud collection. It uses an algorithm known as *Information Gain*, to select the important terms from the training set. The `features` function was designed to work specifically with the <<train,train>> function, which uses the extracted features to train a text classifier.
The `features` function is designed to work with a training set that provides both positive and negative examples of a class. It emits a tuple for each feature term that is extracted along with the inverse document frequency (IDF) for the term in the training set.
The `features` function uses a query to select the training set from a collection. The IDF for each selected feature is calculated relative to the training set matching the query. This allows multiple training sets to be stored in the same SolrCloud collection without polluting the IDF across training sets.
=== features Parameters
* `collection`: (Mandatory) The collection that holds the training set
* `q`: (Mandatory) The query that defines the training set. The IDF for the features will be generated specific to the result set matching the query.
* `featureSet`: (Mandatory) The name of the feature set. This can be used to retrieve the features if they are stored in a SolrCloud collection.
* `field`: (Mandatory) The text field to extract the features from.
* `outcome`: (Mandatory) The field that defines the class, positive or negative
* `numTerms`: (Mandatory) How many feature terms to extract.
* `positiveLabel`: (defaults to 1) The value in the outcome field that defines a postive outcome.
=== features Syntax
[source,text]
----
features(collection1,
q="*:*",
featureSet="features1",
field="body",
outcome="out_i",
numTerms=250)
----
== cat
The `cat` function reads the specified files or directories and emits each line in the file(s) as a tuple.
Each emitted tuple contains two fields: `file` and `line`. `file` contains the path to the file being read from relative to the `userfiles` chroot (directly under `$SOLR_HOME`), and `line` contains a line in that file.
`cat` is ideally used with the `update` stream to index data from the specified documents, or with the `analyze` stream to further split the lines into individual tokens for statistical processing or visualization.
=== cat Parameters
* `filePaths`: (Mandatory) a comma separated list of filepaths to read lines from. If the specified path is a directory, it will be crawled recursively and all contained files will be read. To prevent malicious users from reading arbitrary files from Solr nodes, `filePaths` must be a relative path measured from a chroot of `$SOLR_HOME/userfiles` on the node running the streaming expression.
* `maxLines`: (defaults to -1) The maximum number of lines to read (and tuples to emit). If a negative value is specified, all lines in the specified files will be emitted as tuples. Files are read in the order that they appear in the comma-separated `filePaths` argument. If the line-limit is hit, it will be these later files that are partially emitted or not read at all.
=== cat Examples
The following example emits all lines from a single text file located at `$SOLR_HOME/userfiles/authors.txt`:
[source,text]
----
cat("authors.txt")
----
This example will read lines from `$SOLR_HOME/userfiles/authors.txt`, as well as all files (recursively) found under `$SOLR_HOME/userfiles/fiction/scifi`. Only 500 lines will be emitted, meaning that some files may be partially emitted or not read at all:
[source,text]
----
cat("authors.txt,fiction/scifi/", maxLines=500)
----
== nodes
The `nodes` function provides breadth-first graph traversal. For details, see the section <<graph-traversal.adoc#,Graph Traversal>>.
== knnSearch
The `knnSearch` function returns the k-nearest neighbors for a document based on text similarity. Under the covers the `knnSearch` function
uses the More Like This query parser plugin.
=== knnSearch Parameters
* `collection`: (Mandatory) The collection to perform the search in.
* `id`: (Mandatory) The id of the source document to begin the knn search from.
* `qf`: (Mandatory) The query field used to compare documents.
* `k`: (Mandatory) The number of nearest neighbors to return.
* `fl`: (Mandatory) The field list to return.
* `mindf`: (Optional) The minimum number of occurrences in the corpus to be included in the search.
* `maxdf`: (Optional) The maximum number of occurrences in the corpus to be included in the search.
* `minwl`: (Optional) The minimum world length of to be included in the search.
* `maxwl`: (Optional) The maximum world length of to be included in the search.
=== knnSearch Syntax
[source,text]
----
knnSearch(collection1,
id="doc1",
qf="text_field",
k="10",
fl="id, title",
mintf="3",
maxdf="10000000")
----
== model
The `model` function retrieves and caches logistic regression text classification models that are stored in a SolrCloud collection. The `model` function is designed to work with models that are created by the <<train,train function>>, but can also be used to retrieve text classification models trained outside of Solr, as long as they conform to the specified format. After the model is retrieved it can be used by the <<stream-decorator-reference.adoc#classify,classify function>> to classify documents.
A single model tuple is fetched and returned based on the *id* parameter. The model is retrieved by matching the *id* parameter with a model name in the index. If more then one iteration of the named model is stored in the index, the highest iteration is selected.
=== Caching with model
The `model` function has an internal LRU (least-recently-used) cache so models do not have to be retrieved with each invocation of the `model` function. The time to cache for each model ID can be passed as a parameter to the function call. Retrieving a cached model does not reset the time for expiring the model ID in the cache.
=== Model Storage
The storage format of the models in Solr is below. The `train` function outputs the format below so you only need to know schema details if you plan to use the `model` function with logistic regression models trained outside of Solr.
* `name_s` (Single value, String, Stored): The name of the model.
* `iteration_i` (Single value, Integer, Stored): The iteration number of the model. Solr can store all iterations of the models generated by the train function.
* `terms_ss` (Multi value, String, Stored: The array of terms/features of the model.
* `weights_ds` (Multi value, double, Stored): The array of term weights. Each weight corresponds by array index to a term.
* `idfs_ds` (Multi value, double, Stored): The array of term IDFs (Inverse document frequency). Each IDF corresponds by array index to a term.
=== model Parameters
* `collection`: (Mandatory) The collection where the model is stored.
* `id`: (Mandatory) The id/name of the model. The model function always returns one model. If there are multiple iterations of the name, the highest iteration is returned.
* `cacheMillis`: (Optional) The amount of time to cache the model in the LRU cache.
=== model Syntax
[source,text]
----
model(modelCollection,
id="myModel"
cacheMillis="200000")
----
== random
The `random` function searches a SolrCloud collection and emits a pseudo-random set of results that match the query. Each invocation of random will return a different pseudo-random result set.
=== random Parameters
* `collection`: (Mandatory) The collection the stats will be aggregated from.
* `q`: (Mandatory) The query to build the aggregations from.
* `rows`: (Mandatory) The number of pseudo-random results to return.
* fl: (Mandatory) The field list to return.
* `fq`: (Optional) Filter query
=== random Syntax
[source,text]
----
random(baskets,
q="productID:productX",
rows="100",
fl="basketID")
----
In the example above the `random` function is searching the baskets collections for all rows where "productID:productX". It will return 100 pseudo-random results. The field list returned is the basketID.
== significantTerms
The `significantTerms` function queries a SolrCloud collection, but instead of returning documents, it returns significant terms found in documents in the result set. The `significantTerms` function scores terms based on how frequently they appear in the result set and how rarely they appear in the entire corpus. The `significantTerms` function emits a tuple for each term which contains the term, the score, the foreground count and the background count. The foreground count is how many documents the term appears in the result set. The background count is how many documents the term appears in the entire corpus. The foreground and background counts are global for the collection.
=== significantTerms Parameters
* `collection`: (Mandatory) The collection that the function is run on.
* `q`: (Mandatory) The query that describes the foreground document set.
* `field`: (Mandatory) The field to extract the terms from.
* `limit`: (Optional, Default 20) The max number of terms to return.
* `minDocFreq`: (Optional, Defaults to 5 documents) The minimum number of documents the term must appear in on a shard. This is a float value. If greater then 1.0 then it's considered the absolute number of documents. If less then 1.0 it's treated as a percentage of documents.
* `maxDocFreq`: (Optional, Defaults to 30% of documents) The maximum number of documents the term can appear in on a shard. This is a float value. If greater then 1.0 then it's considered the absolute number of documents. If less then 1.0 it's treated as a percentage of documents.
* `minTermLength`: (Optional, Default 4) The minimum length of the term to be considered significant.
=== significantTerms Syntax
[source,text]
----
significantTerms(collection1,
q="body:Solr",
field="author",
limit="50",
minDocFreq="10",
maxDocFreq=".20",
minTermLength="5")
----
In the example above the `significantTerms` function is querying `collection1` and returning at most 50 significant terms from the `authors` field that appear in 10 or more documents but not more then 20% of the corpus.
== shortestPath
The `shortestPath` function is an implementation of a shortest path graph traversal. The `shortestPath` function performs an iterative breadth-first search through an unweighted graph to find the shortest paths between two nodes in a graph. The `shortestPath` function emits a tuple for each path found. Each tuple emitted will contain a `path` key which points to a `List` of nodeIDs comprising the path.
=== shortestPath Parameters
* `collection`: (Mandatory) The collection that the topic query will be run on.
* `from`: (Mandatory) The nodeID to start the search from
* `to`: (Mandatory) The nodeID to end the search at
* `edge`: (Mandatory) Syntax: `from_field=to_field`. The `from_field` defines which field to search from. The `to_field` defines which field to search to. See example below for a detailed explanation.
* `threads`: (Optional: Default 6) The number of threads used to perform the partitioned join in the traversal.
* `partitionSize`: (Optional: Default 250) The number of nodes in each partition of the join.
* `fq`: (Optional) Filter query
* `maxDepth`: (Mandatory) Limits to the search to a maximum depth in the graph.
=== shortestPath Syntax
[source,text]
----
shortestPath(collection,
from="john@company.com",
to="jane@company.com",
edge="from_address=to_address",
threads="6",
partitionSize="300",
fq="limiting query",
maxDepth="4")
----
The expression above performs a breadth-first search to find the shortest paths in an unweighted, directed graph.
The search starts from the nodeID "\john@company.com" in the `from_address` field and searches for the nodeID "\jane@company.com" in the `to_address` field. This search is performed iteratively until the `maxDepth` has been reached. Each level in the traversal is implemented as a parallel partitioned nested loop join across the entire collection. The `threads` parameter controls the number of threads performing the join at each level, while the `partitionSize` parameter controls the of number of nodes in each join partition. The `maxDepth` parameter controls the number of levels to traverse. `fq` is a limiting query applied to each level in the traversal.
== shuffle
The `shuffle` expression sorts and exports entire result sets. The `shuffle` expression is similar to the `search` expression except that
under the covers `shuffle` always uses the /export handler. The `shuffle` expression is designed to be combined with the relational algebra
decorators that require complete, sorted result sets. Shuffled result sets can be partitioned across worker nodes with the parallel
stream decorator to perform parallel relational algebra. When used in parallel mode the partitionKeys parameter must be provided.
=== shuffle Parameters
* `collection`: (Mandatory) the collection being searched.
* `q`: (Mandatory) The query to perform on the Solr index.
* `fl`: (Mandatory) The list of fields to return.
* `sort`: (Mandatory) The sort criteria.
* `zkHost`: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
* `partitionKeys`: Comma delimited list of keys to partition the search results by. To be used with the parallel function for parallelizing operations across worker nodes. See the <<stream-decorator-reference.adoc#parallel,parallel>> function for details.
=== shuffle Syntax
[source,text]
----
shuffle(collection1,
q="*:*",
fl="id,a_s,a_i,a_f",
sort="a_f asc, a_i asc")
----
== stats
The `stats` function gathers simple aggregations for a search result set. The stats function does not support rollups over buckets, so the stats stream always returns a single tuple with the rolled up stats. Under the covers the stats function pushes down the generation of the stats into the search engine using the StatsComponent. The stats function currently supports the following metrics: `count(*)`, `sum()`, `avg()`, `min()`, and `max()`.
=== stats Parameters
* `collection`: (Mandatory) Collection the stats will be aggregated from.
* `q`: (Mandatory) The query to build the aggregations from.
* `metrics`: (Mandatory) The metrics to include in the result tuple. Current supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `per(col, 50)`. The `per` metric calculates a percentile
for a numeric column and can be specified multiple times in the same stats function.
=== stats Syntax
[source,text]
----
stats(collection1,
q=*:*,
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
----
== timeseries
The `timeseries` function builds a time series aggregation. Under the covers the `timeseries` function uses the
JSON Facet API as its high performance aggregation engine.
=== timeseries Parameters
* `collection`: (Mandatory) Collection the stats will be aggregated from.
* `q`: (Mandatory) The query to build the aggregations from.
* `field`: (Mandatory) The date field for the time series.
* `start`: (Mandatory) The start of the time series expressed in Solr date or date math syntax.
* `end`: (Mandatory) The end of the time series expressed in Solr date or date math syntax.
* `gap`: (Mandatory) The time gap between time series aggregation points expressed in Solr date math syntax.
* `format`: (Optional) Date template to format the date field in the output tuples. Formatting is performed by Java's SimpleDateFormat class.
* `metrics`: (Mandatory) The metrics to include in the result tuple. Current supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `per(col, 50)`. The `per` metric calculates a percentile
for a numeric column and can be specified multiple times in the same timeseries function.
=== timeseries Syntax
[source,text]
----
timeseries(collection1,
q=*:*,
field="rec_dt"
start="NOW-30DAYS",
end="NOW",
gap="+1DAY",
format="YYYY-MM-dd",
sum(a_i),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
per(a_f, 50),
per(a_f, 75),
count(*))
----
== train
The `train` function trains a Logistic Regression text classifier on a training set stored in a SolrCloud collection. It uses a parallel iterative, batch Gradient Descent approach to train the model. The training algorithm is embedded inside Solr so with each iteration only the model is streamed across the network.
The `train` function wraps a <<features,features>> function which provides the terms and inverse document frequency (IDF) used to train the model. The `train` function operates over the same training set as the `features` function, which includes both positive and negative examples of the class.
With each iteration the `train` function emits a tuple with the model. The model contains the feature terms, weights, and the confusion matrix for the model. The optimized model can then be used to classify documents based on their feature terms.
=== train Parameters
* `collection`: (Mandatory) Collection that holds the training set
* `q`: (Mandatory) The query that defines the training set. The IDF for the features will be generated on the
* `name`: (Mandatory) The name of model. This can be used to retrieve the model if they stored in a SolrCloud collection.
* `field`: (Mandatory) The text field to extract the features from.
* `outcome`: (Mandatory) The field that defines the class, positive or negative
* `maxIterations`: (Mandatory) How many training iterations to perform.
* `positiveLabel`: (defaults to 1) The value in the outcome field that defines a positive outcome.
=== train Syntax
[source,text]
----
train(collection1,
features(collection1, q="*:*", featureSet="first", field="body", outcome="out_i", numTerms=250),
q="*:*",
name="model1",
field="body",
outcome="out_i",
maxIterations=100)
----
== topic
The `topic` function provides publish/subscribe messaging capabilities built on top of SolrCloud. The topic function allows users to subscribe to a query. The function then provides one-time delivery of new or updated documents that match the topic query. The initial call to the topic function establishes the checkpoints for the specific topic ID. Subsequent calls to the same topic ID will return documents added or updated after the initial checkpoint. Each run of the topic query updates the checkpoints for the topic ID. Setting the initialCheckpoint parameter to 0 will cause the topic to process all documents in the index that match the topic query.
[WARNING]
====
The topic function should be considered in beta until https://issues.apache.org/jira/browse/SOLR-8709[SOLR-8709] is committed and released.
====
=== topic Parameters
* `checkpointCollection`: (Mandatory) The collection where the topic checkpoints are stored.
* `collection`: (Mandatory) The collection that the topic query will be run on.
* `id`: (Mandatory) The unique ID for the topic. The checkpoints will be saved under this id.
* `q`: (Mandatory) The topic query.
* `fl`: (Mandatory) The field list returned by the topic function.
* `initialCheckpoint`: (Optional) Sets the initial Solr `\_version_` number to start reading from the queue. If not set, it defaults to the highest version in the index. Setting to 0 will process all records that match query in the index.
* `zkHost`: (Optional) Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
=== topic Syntax
[source,text]
----
topic(checkpointCollection,
collection,
id="uniqueId",
q="topic query",
fl="id, name, country")
----
== tuple
The `tuple` function emits a single tuple with name/value pairs.
The values can be set to variables assigned in a `let` expression, literals, stream evaluators or stream expressions.
In the case of stream evaluators the tuple will output the return value from the evaluator.
This could be a numeric, list, or map.
If a value is set to a stream expression, the `tuple` function will flatten
the tuple stream from the stream expression into a list of tuples.
=== tuple Parameters
* name=value pairs
=== tuple Syntax
[source,text]
----
tuple(a=add(1,1),
b=search(collection1, q="cat:a", fl="a, b, c", sort="a desc"))
----