blob: c9d9a7358ac47fa4d2523c57d5408325b8e6e782 [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
import json
from nifiapi.properties import PropertyDependency, PropertyDescriptor, StandardValidators
ROW_ORIENTED = "Row-Oriented"
TEXT = "Text"
COLUMN_ORIENTED = "Column-Oriented"
OUTPUT_STRATEGY = PropertyDescriptor(
name="Output Strategy",
description="""Specifies whether the output should contain only the text of the documents (each document separated by \\n\\n), or if it
should be formatted as either single column-oriented JSON object,
consisting of a keys 'ids', 'embeddings', 'documents', 'distances', and 'metadatas'; or if the results should be row-oriented,
a JSON per line, each consisting of a single id, document, metadata, embedding, and distance.""",
allowable_values=[ROW_ORIENTED, TEXT, COLUMN_ORIENTED],
default_value=ROW_ORIENTED,
required=True,
)
RESULTS_FIELD = PropertyDescriptor(
name="Results Field",
description="""If the input FlowFile is JSON Formatted, this represents the name of the field to insert the results. This allows the results to be inserted into
"an existing input in order to enrich it. If this property is unset, the results will be written to the FlowFile contents, overwriting any pre-existing content.""",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
required=False,
)
INCLUDE_IDS = PropertyDescriptor(
name="Include Document IDs",
description="Whether or not to include the Documents' IDs in the response",
allowable_values=["true", "false"],
default_value="true",
required=False,
dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, COLUMN_ORIENTED)],
)
INCLUDE_METADATAS = PropertyDescriptor(
name="Include Metadata",
description="Whether or not to include the Documents' Metadata in the response",
allowable_values=["true", "false"],
default_value="true",
required=False,
dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, COLUMN_ORIENTED)],
)
INCLUDE_DOCUMENTS = PropertyDescriptor(
name="Include Document",
description="Whether or not to include the Documents' Text in the response",
allowable_values=["true", "false"],
default_value="true",
required=False,
dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, COLUMN_ORIENTED)],
)
INCLUDE_DISTANCES = PropertyDescriptor(
name="Include Distances",
description="Whether or not to include the Documents' Distances (i.e., how far the Document was away from the query) in the response",
allowable_values=["true", "false"],
default_value="true",
required=False,
dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, COLUMN_ORIENTED)],
)
INCLUDE_EMBEDDINGS = PropertyDescriptor(
name="Include Embeddings",
description="Whether or not to include the Documents' Embeddings in the response",
allowable_values=["true", "false"],
default_value="false",
required=False,
dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, COLUMN_ORIENTED)],
)
class QueryUtils:
context = None
def __init__(self, context):
self.context = context
self.results_field = context.getProperty(RESULTS_FIELD).getValue()
self.output_strategy = context.getProperty(OUTPUT_STRATEGY).getValue()
ids_property = context.getProperty(INCLUDE_IDS)
self.include_ids = ids_property.asBoolean() if ids_property else False
embeddings_property = context.getProperty(INCLUDE_EMBEDDINGS)
self.include_embeddings = embeddings_property.asBoolean() if embeddings_property else False
self.include_distances = context.getProperty(INCLUDE_DISTANCES).asBoolean()
documents_property = context.getProperty(INCLUDE_DOCUMENTS)
self.include_documents = documents_property.asBoolean() if documents_property else True
self.include_metadatas = context.getProperty(INCLUDE_METADATAS).asBoolean()
def create_json(self, flowfile, documents, metadatas, embeddings, distances, ids) -> tuple[str, str]:
input_json = None if self.results_field is None else json.loads(flowfile.getContentsAsBytes().decode())
if self.output_strategy == TEXT:
# Delete any document that is None or an empty-string
documents = [doc for doc in documents if doc is not None and doc != ""]
# Join the documents with two newlines
text = "\n\n".join(documents)
# Create either JSON or text output, based on whether or not an results field was specified
if input_json is None:
mime_type = "text/plain"
output_contents = text
else:
input_json[self.results_field] = text
output_contents = json.dumps(input_json)
mime_type = "application/json"
elif self.output_strategy == COLUMN_ORIENTED:
doc = {}
if self.include_ids:
doc["ids"] = ids
if self.include_distances:
doc["distances"] = distances
if self.include_documents:
doc["documents"] = documents
if self.include_metadatas:
doc["metadatas"] = metadatas
if self.include_embeddings:
doc["embeddings"] = embeddings
# Create the JSON from the Document
if input_json is None:
output_contents = json.dumps(doc)
else:
input_json[self.results_field] = doc
output_contents = json.dumps(input_json)
mime_type = "application/json"
else:
# Build the Documents
docs = []
count = len(ids) if ids else len(documents)
for i in range(count):
doc_id = None if ids is None else ids[i]
distance = None if distances is None else distances[i]
metadata = None if metadatas is None else metadatas[i]
document = None if documents is None else documents[i]
embedding = None if embeddings is None else embeddings[i]
# Create the document but do not include any key that we don't want to include in the output.
doc = {}
if self.include_ids:
doc["id"] = doc_id
if self.include_distances:
doc["distance"] = distance
if self.include_documents:
doc["document"] = document
if self.include_metadatas:
doc["metadata"] = metadata
if self.include_embeddings:
doc["embedding"] = embedding
docs.append(doc)
# If input_json is None, we just create JSON based on the Documents.
# If input_json is populated, we insert the documents into the input JSON using the specified key.
if input_json is None:
jsons = []
for doc in docs:
jsons.append(json.dumps(doc))
output_contents = "\n".join(jsons)
else:
input_json[self.results_field] = docs
output_contents = json.dumps(input_json)
mime_type = "application/json"
return output_contents, mime_type